Kafka Exporter

Kafka exporter exports traces to Kafka. This exporter uses a synchronous producer that blocks and does not batch messages, therefore it should be used with batch and queued retry processors for higher throughput and resiliency. Message payload encoding is configurable.

The following settings are required:

  • protocol_version (no default): Kafka protocol version e.g. 2.0.0

The following settings can be optionally configured:

  • brokers (default = localhost:9092): The list of kafka brokers
  • topic (default = otlp_spans for traces, otlp_metrics for metrics): The name of the kafka topic to export to.
  • encoding (default = otlp_proto): The encoding of the traces sent to kafka. All available encodings:
    • otlp_proto: payload is Protobuf serialized from ExportTraceServiceRequest if set as a traces exporter or ExportMetricsServiceRequest for metrics.
    • The following encodings are valid only for traces.
      • jaeger_proto: the payload is serialized to a single Jaeger proto Span, and keyed by TraceID.
      • jaeger_json: the payload is serialized to a single Jaeger JSON Span using jsonpb, and keyed by TraceID.
  • auth
    • plain_text
      • username: The username to use.
      • password: The password to use
    • sasl
      • username: The username to use.
      • password: The password to use
      • mechanism: The sasl mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512 or PLAIN)
    • tls
      • ca_file: path to the CA cert. For a client this verifies the server certificate. Should only be used if insecure is set to true.
      • cert_file: path to the TLS cert to use for TLS required connections. Should only be used if insecure is set to true.
      • key_file: path to the TLS key to use for TLS required connections. Should only be used if insecure is set to true.
      • insecure (default = false): Disable verifying the server's certificate chain and host name (InsecureSkipVerify in the tls config)
      • server_name_override: ServerName indicates the name of the server requested by the client in order to support virtual hosting.
    • kerberos
      • service_name: Kerberos service name
      • realm: Kerberos realm
      • use_keytab: Use of keytab instead of password, if this is true, keytab file will be used instead of password
      • username: The Kerberos username used for authenticate with KDC
      • password: The Kerberos password used for authenticate with KDC
      • config_file: Path to Kerberos configuration. i.e /etc/krb5.conf
      • keytab_file: Path to keytab file. i.e /etc/security/kafka.keytab
  • metadata
    • full (default = true): Whether to maintain a full set of metadata. When disabled the client does not make the initial request to broker at the startup.
    • retry
      • max (default = 3): The number of retries to get metadata
      • backoff (default = 250ms): How long to wait between metadata retries
  • timeout (default = 5s): Is the timeout for every attempt to send data to the backend.
  • retry_on_failure
    • enabled (default = true)
    • initial_interval (default = 5s): Time to wait after the first failure before retrying; ignored if enabled is false
    • max_interval (default = 30s): Is the upper bound on backoff; ignored if enabled is false
    • max_elapsed_time (default = 120s): Is the maximum amount of time spent trying to send a batch; ignored if enabled is false
  • sending_queue
    • enabled (default = true)
    • num_consumers (default = 10): Number of consumers that dequeue batches; ignored if enabled is false
    • queue_size (default = 5000): Maximum number of batches kept in memory before dropping data; ignored if enabled is false; User should calculate this as num_seconds * requests_per_second where:
      • num_seconds is the number of seconds to buffer in case of a backend outage
      • requests_per_second is the average number of requests per seconds.

Example configuration:

      - localhost:9092
    protocol_version: 2.0.0




This section is empty.


This section is empty.


func ConfigureAuthentication

func ConfigureAuthentication(config Authentication, saramaConfig *sarama.Config) error

    ConfigureAuthentication configures authentication in sarama.Config.

    func NewFactory

    func NewFactory(options ...FactoryOption) component.ExporterFactory

      NewFactory creates Kafka exporter factory.


      type Authentication

      type Authentication struct {
      	PlainText *PlainTextConfig            `mapstructure:"plain_text"`
      	SASL      *SASLConfig                 `mapstructure:"sasl"`
      	TLS       *configtls.TLSClientSetting `mapstructure:"tls"`
      	Kerberos  *KerberosConfig             `mapstructure:"kerberos"`

        Authentication defines authentication.

        type Config

        type Config struct {
        	*config.ExporterSettings       `mapstructure:"-"`
        	exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
        	exporterhelper.QueueSettings   `mapstructure:"sending_queue"`
        	exporterhelper.RetrySettings   `mapstructure:"retry_on_failure"`
        	// The list of kafka brokers (default localhost:9092)
        	Brokers []string `mapstructure:"brokers"`
        	// Kafka protocol version
        	ProtocolVersion string `mapstructure:"protocol_version"`
        	// The name of the kafka topic to export to (default otlp_spans for traces, otlp_metrics for metrics)
        	Topic string `mapstructure:"topic"`
        	// Encoding of messages (default "otlp_proto")
        	Encoding string `mapstructure:"encoding"`
        	// Metadata is the namespace for metadata management properties used by the
        	// Client, and shared by the Producer/Consumer.
        	Metadata Metadata `mapstructure:"metadata"`
        	// Authentication defines used authentication mechanism.
        	Authentication Authentication `mapstructure:"auth"`

          Config defines configuration for Kafka exporter.

          func (*Config) Validate

          func (cfg *Config) Validate() error

            Validate checks if the exporter configuration is valid

            type FactoryOption

            type FactoryOption func(factory *kafkaExporterFactory)

              FactoryOption applies changes to kafkaExporterFactory.

              func WithAddTracesMarshallers

              func WithAddTracesMarshallers(encodingMarshaller map[string]TracesMarshaller) FactoryOption

                WithAddTracesMarshallers adds tracesMarshallers.

                type KerberosConfig

                type KerberosConfig struct {
                	ServiceName string `mapstructure:"service_name"`
                	Realm       string `mapstructure:"realm"`
                	UseKeyTab   bool   `mapstructure:"use_keytab"`
                	Username    string `mapstructure:"username"`
                	Password    string `mapstructure:"password" json:"-"`
                	ConfigPath  string `mapstructure:"config_file"`
                	KeyTabPath  string `mapstructure:"keytab_file"`

                  KerberosConfig defines kereros configuration.

                  type Message

                  type Message struct {
                  	Value []byte
                  	Key   []byte

                    Message encapsulates Kafka's message payload.

                    type Metadata

                    type Metadata struct {
                    	// Whether to maintain a full set of metadata for all topics, or just
                    	// the minimal set that has been necessary so far. The full set is simpler
                    	// and usually more convenient, but can take up a substantial amount of
                    	// memory if you have many topics and partitions. Defaults to true.
                    	Full bool `mapstructure:"full"`
                    	// Retry configuration for metadata.
                    	// This configuration is useful to avoid race conditions when broker
                    	// is starting at the same time as collector.
                    	Retry MetadataRetry `mapstructure:"retry"`

                      Metadata defines configuration for retrieving metadata from the broker.

                      type MetadataRetry

                      type MetadataRetry struct {
                      	// The total number of times to retry a metadata request when the
                      	// cluster is in the middle of a leader election or at startup (default 3).
                      	Max int `mapstructure:"max"`
                      	// How long to wait for leader election to occur before retrying
                      	// (default 250ms). Similar to the JVM's ``.
                      	Backoff time.Duration `mapstructure:"backoff"`

                        MetadataRetry defines retry configuration for Metadata.

                        type MetricsMarshaller

                        type MetricsMarshaller interface {
                        	// Marshal serializes metrics into Messages
                        	Marshal(metrics pdata.Metrics) ([]Message, error)
                        	// Encoding returns encoding name
                        	Encoding() string

                          MetricsMarshaller marshals metrics into Message array

                          type PlainTextConfig

                          type PlainTextConfig struct {
                          	Username string `mapstructure:"username"`
                          	Password string `mapstructure:"password"`

                            PlainTextConfig defines plaintext authentication.

                            type SASLConfig

                            type SASLConfig struct {
                            	// Username to be used on authentication
                            	Username string `mapstructure:"username"`
                            	// Password to be used on authentication
                            	Password string `mapstructure:"password"`
                            	// SASL Mechanism to be used, possible values are: (PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512).
                            	Mechanism string `mapstructure:"mechanism"`

                              SASLConfig defines the configuration for the SASL authentication.

                              type TracesMarshaller

                              type TracesMarshaller interface {
                              	// Marshal serializes spans into Messages
                              	Marshal(traces pdata.Traces) ([]Message, error)
                              	// Encoding returns encoding name
                              	Encoding() string

                                TracesMarshaller marshals traces into Message array.

                                type XDGSCRAMClient

                                type XDGSCRAMClient struct {

                                  XDGSCRAMClient uses xdg-go scram to authentication conversation

                                  func (*XDGSCRAMClient) Begin

                                  func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)

                                    Begin starts the XDGSCRAMClient conversation.

                                    func (*XDGSCRAMClient) Done

                                    func (x *XDGSCRAMClient) Done() bool

                                      Done returns true if the conversation is completed or has errored.

                                      func (*XDGSCRAMClient) Step

                                      func (x *XDGSCRAMClient) Step(challenge string) (response string, err error)

                                        Step takes a string provided from a server (or just an empty string for the very first conversation step) and attempts to move the authentication conversation forward. It returns a string to be sent to the server or an error if the server message is invalid. Calling Step after a conversation completes is also an error.