kafka_consumer

package
v0.0.0-...-0fce7b6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 26, 2019 License: MIT Imports: 11 Imported by: 0

README

Kafka Consumer Input Plugin

The Kafka consumer plugin reads from Kafka and creates metrics using one of the supported input data formats.

For old kafka version (< 0.8), please use the kafka_consumer_legacy input plugin and use the old zookeeper connection method.

Configuration
[[inputs.kafka_consumer]]
  ## kafka servers
  brokers = ["localhost:9092"]
  ## topic(s) to consume
  topics = ["telegraf"]
  ## Add topic as tag if topic_tag is not empty
  # topic_tag = ""

  ## Optional Client id
  # client_id = "Telegraf"

  ## Set the minimal supported Kafka version.  Setting this enables the use of new
  ## Kafka features and APIs.  Of particular interest, lz4 compression
  ## requires at least version 0.10.0.0.
  ##   ex: version = "1.1.0"
  # version = ""

  ## Optional TLS Config
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

  ## Optional SASL Config
  # sasl_username = "kafka"
  # sasl_password = "secret"

  ## the name of the consumer group
  consumer_group = "telegraf_metrics_consumers"
  ## Offset (must be either "oldest" or "newest")
  offset = "oldest"

  ## Maximum length of a message to consume, in bytes (default 0/unlimited);
  ## larger messages are dropped
  max_message_len = 1000000

  ## Maximum messages to read from the broker that have not been written by an
  ## output.  For best throughput set based on the number of metrics within
  ## each message and the size of the output's metric_batch_size.
  ##
  ## For example, if each message from the queue contains 10 metrics and the
  ## output metric_batch_size is 1000, setting this to 100 will ensure that a
  ## full batch is collected and the write is triggered immediately without
  ## waiting until the next flush_interval.
  # max_undelivered_messages = 1000

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "influx"

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer interface {
	Errors() <-chan error
	Messages() <-chan *sarama.ConsumerMessage
	MarkOffset(msg *sarama.ConsumerMessage, metadata string)
	Close() error
}

type Kafka

type Kafka struct {
	ConsumerGroup          string   `toml:"consumer_group"`
	ClientID               string   `toml:"client_id"`
	Topics                 []string `toml:"topics"`
	Brokers                []string `toml:"brokers"`
	MaxMessageLen          int      `toml:"max_message_len"`
	Version                string   `toml:"version"`
	MaxUndeliveredMessages int      `toml:"max_undelivered_messages"`
	Offset                 string   `toml:"offset"`
	SASLUsername           string   `toml:"sasl_username"`
	SASLPassword           string   `toml:"sasl_password"`
	TopicTag               string   `toml:"topic_tag"`

	tls.ClientConfig
	// contains filtered or unexported fields
}

func (*Kafka) Description

func (k *Kafka) Description() string

func (*Kafka) Gather

func (k *Kafka) Gather(acc telegraf.Accumulator) error

func (*Kafka) SampleConfig

func (k *Kafka) SampleConfig() string

func (*Kafka) SetParser

func (k *Kafka) SetParser(parser parsers.Parser)

func (*Kafka) Start

func (k *Kafka) Start(acc telegraf.Accumulator) error

func (*Kafka) Stop

func (k *Kafka) Stop()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL