kafkaexporter

package module
v0.99.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2024 License: Apache-2.0 Imports: 27 Imported by: 16

README

Kafka Exporter

Status
Stability beta: traces, metrics, logs
Distributions core, contrib
Issues Open issues Closed issues
Code Owners @pavolloffay, @MovieStoreGuy

Kafka exporter exports logs, metrics, and traces to Kafka. This exporter uses a synchronous producer that blocks and does not batch messages, therefore it should be used with batch and queued retry processors for higher throughput and resiliency. Message payload encoding is configurable.

The following settings are required:

  • protocol_version (no default): Kafka protocol version e.g. 2.0.0.

The following settings can be optionally configured:

  • brokers (default = localhost:9092): The list of kafka brokers.
  • resolve_canonical_bootstrap_servers_only (default = false): Whether to resolve then reverse-lookup broker IPs during startup.
  • client_id (default = "sarama"): The client ID to configure the Sarama Kafka client with. The client ID will be used for all produce requests.
  • topic (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the kafka topic to export to.
  • encoding (default = otlp_proto): The encoding of the traces sent to kafka. All available encodings:
    • otlp_proto: payload is Protobuf serialized from ExportTraceServiceRequest if set as a traces exporter or ExportMetricsServiceRequest for metrics or ExportLogsServiceRequest for logs.
    • otlp_json: payload is JSON serialized from ExportTraceServiceRequest if set as a traces exporter or ExportMetricsServiceRequest for metrics or ExportLogsServiceRequest for logs.
    • The following encodings are valid only for traces.
      • jaeger_proto: the payload is serialized to a single Jaeger proto Span, and keyed by TraceID.
      • jaeger_json: the payload is serialized to a single Jaeger JSON Span using jsonpb, and keyed by TraceID.
      • zipkin_proto: the payload is serialized to Zipkin v2 proto Span.
      • zipkin_json: the payload is serialized to Zipkin v2 JSON Span.
    • The following encodings are valid only for logs.
      • raw: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded.
  • partition_traces_by_id (default = false): configures the exporter to include the trace ID as the message key in trace messages sent to kafka. Please note: this setting does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default.
  • auth
    • plain_text
      • username: The username to use.
      • password: The password to use
    • sasl
      • username: The username to use.
      • password: The password to use
      • mechanism: The SASL mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512, AWS_MSK_IAM or PLAIN)
      • version (default = 0): The SASL protocol version to use (0 or 1)
      • aws_msk.region: AWS Region in case of AWS_MSK_IAM mechanism
      • aws_msk.broker_addr: MSK Broker address in case of AWS_MSK_IAM mechanism
    • tls
      • ca_file: path to the CA cert. For a client this verifies the server certificate. Should only be used if insecure is set to false.
      • cert_file: path to the TLS cert to use for TLS required connections. Should only be used if insecure is set to false.
      • key_file: path to the TLS key to use for TLS required connections. Should only be used if insecure is set to false.
      • insecure (default = false): Disable verifying the server's certificate chain and host name (InsecureSkipVerify in the tls config)
      • server_name_override: ServerName indicates the name of the server requested by the client in order to support virtual hosting.
    • kerberos
      • service_name: Kerberos service name
      • realm: Kerberos realm
      • use_keytab: Use of keytab instead of password, if this is true, keytab file will be used instead of password
      • username: The Kerberos username used for authenticate with KDC
      • password: The Kerberos password used for authenticate with KDC
      • config_file: Path to Kerberos configuration. i.e /etc/krb5.conf
      • keytab_file: Path to keytab file. i.e /etc/security/kafka.keytab
  • metadata
    • full (default = true): Whether to maintain a full set of metadata. When disabled, the client does not make the initial request to broker at the startup.
    • retry
      • max (default = 3): The number of retries to get metadata
      • backoff (default = 250ms): How long to wait between metadata retries
  • timeout (default = 5s): Is the timeout for every attempt to send data to the backend.
  • retry_on_failure
    • enabled (default = true)
    • initial_interval (default = 5s): Time to wait after the first failure before retrying; ignored if enabled is false
    • max_interval (default = 30s): Is the upper bound on backoff; ignored if enabled is false
    • max_elapsed_time (default = 120s): Is the maximum amount of time spent trying to send a batch; ignored if enabled is false
  • sending_queue
    • enabled (default = true)
    • num_consumers (default = 10): Number of consumers that dequeue batches; ignored if enabled is false
    • queue_size (default = 1000): Maximum number of batches kept in memory before dropping data; ignored if enabled is false; User should calculate this as num_seconds * requests_per_second where:
      • num_seconds is the number of seconds to buffer in case of a backend outage
      • requests_per_second is the average number of requests per seconds.
  • producer

Example configuration:

exporters:
  kafka:
    brokers:
      - localhost:9092
    protocol_version: 2.0.0

Documentation

Overview

Package kafkaexporter exports trace data to Kafka.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewFactory

func NewFactory(options ...FactoryOption) exporter.Factory

NewFactory creates Kafka exporter factory.

Types

type Config

type Config struct {
	exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
	exporterhelper.QueueSettings   `mapstructure:"sending_queue"`
	configretry.BackOffConfig      `mapstructure:"retry_on_failure"`

	// The list of kafka brokers (default localhost:9092)
	Brokers []string `mapstructure:"brokers"`

	// ResolveCanonicalBootstrapServersOnly makes Sarama do a DNS lookup for
	// each of the provided brokers. It will then do a PTR lookup for each
	// returned IP, and that set of names becomes the broker list. This can be
	// required in SASL environments.
	ResolveCanonicalBootstrapServersOnly bool `mapstructure:"resolve_canonical_bootstrap_servers_only"`

	// Kafka protocol version
	ProtocolVersion string `mapstructure:"protocol_version"`

	// ClientID to configure the Kafka client with. This can be leveraged by
	// Kafka to enforce ACLs, throttling quotas, and more.
	ClientID string `mapstructure:"client_id"`

	// The name of the kafka topic to export to (default otlp_spans for traces, otlp_metrics for metrics)
	Topic string `mapstructure:"topic"`

	// Encoding of messages (default "otlp_proto")
	Encoding string `mapstructure:"encoding"`

	// PartitionTracesByID sets the message key of outgoing trace messages to the trace ID.
	// Please note: does not have any effect on Jaeger encoding exporters since Jaeger exporters include
	// trace ID as the message key by default.
	PartitionTracesByID bool `mapstructure:"partition_traces_by_id"`

	// Metadata is the namespace for metadata management properties used by the
	// Client, and shared by the Producer/Consumer.
	Metadata Metadata `mapstructure:"metadata"`

	// Producer is the namespaces for producer properties used only by the Producer
	Producer Producer `mapstructure:"producer"`

	// Authentication defines used authentication mechanism.
	Authentication kafka.Authentication `mapstructure:"auth"`
}

Config defines configuration for Kafka exporter.

func (*Config) Validate

func (cfg *Config) Validate() error

Validate checks if the exporter configuration is valid

type FactoryOption

type FactoryOption func(factory *kafkaExporterFactory)

FactoryOption applies changes to kafkaExporterFactory.

type KeyableTracesMarshaler added in v0.92.0

type KeyableTracesMarshaler interface {
	TracesMarshaler
	Key()
}

KeyableTracesMarshaler is an extension of the TracesMarshaler interface inteded to provide partition key capabilities for trace messages

type LogsMarshaler

type LogsMarshaler interface {
	// Marshal serializes logs into sarama's ProducerMessages
	Marshal(logs plog.Logs, topic string) ([]*sarama.ProducerMessage, error)

	// Encoding returns encoding name
	Encoding() string
}

LogsMarshaler marshals logs into Message array

type Metadata

type Metadata struct {
	// Whether to maintain a full set of metadata for all topics, or just
	// the minimal set that has been necessary so far. The full set is simpler
	// and usually more convenient, but can take up a substantial amount of
	// memory if you have many topics and partitions. Defaults to true.
	Full bool `mapstructure:"full"`

	// Retry configuration for metadata.
	// This configuration is useful to avoid race conditions when broker
	// is starting at the same time as collector.
	Retry MetadataRetry `mapstructure:"retry"`
}

Metadata defines configuration for retrieving metadata from the broker.

type MetadataRetry

type MetadataRetry struct {
	// The total number of times to retry a metadata request when the
	// cluster is in the middle of a leader election or at startup (default 3).
	Max int `mapstructure:"max"`
	// How long to wait for leader election to occur before retrying
	// (default 250ms). Similar to the JVM's `retry.backoff.ms`.
	Backoff time.Duration `mapstructure:"backoff"`
}

MetadataRetry defines retry configuration for Metadata.

type MetricsMarshaler

type MetricsMarshaler interface {
	// Marshal serializes metrics into sarama's ProducerMessages
	Marshal(metrics pmetric.Metrics, topic string) ([]*sarama.ProducerMessage, error)

	// Encoding returns encoding name
	Encoding() string
}

MetricsMarshaler marshals metrics into Message array

type Producer

type Producer struct {
	// Maximum message bytes the producer will accept to produce.
	MaxMessageBytes int `mapstructure:"max_message_bytes"`

	// RequiredAcks Number of acknowledgements required to assume that a message has been sent.
	// https://pkg.go.dev/github.com/IBM/sarama@v1.30.0#RequiredAcks
	// The options are:
	//   0 -> NoResponse.  doesn't send any response
	//   1 -> WaitForLocal. waits for only the local commit to succeed before responding ( default )
	//   -1 -> WaitForAll. waits for all in-sync replicas to commit before responding.
	RequiredAcks sarama.RequiredAcks `mapstructure:"required_acks"`

	// Compression Codec used to produce messages
	// https://pkg.go.dev/github.com/IBM/sarama@v1.30.0#CompressionCodec
	// The options are: 'none', 'gzip', 'snappy', 'lz4', and 'zstd'
	Compression string `mapstructure:"compression"`

	// The maximum number of messages the producer will send in a single
	// broker request. Defaults to 0 for unlimited. Similar to
	// `queue.buffering.max.messages` in the JVM producer.
	FlushMaxMessages int `mapstructure:"flush_max_messages"`
}

Producer defines configuration for producer

type TracesMarshaler

type TracesMarshaler interface {
	// Marshal serializes spans into sarama's ProducerMessages
	Marshal(traces ptrace.Traces, topic string) ([]*sarama.ProducerMessage, error)

	// Encoding returns encoding name
	Encoding() string
}

TracesMarshaler marshals traces into Message array.

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL