kafka_consumer

package
v1.24.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 12, 2022 License: MIT Imports: 13 Imported by: 186

README

Kafka Consumer Input Plugin

The Kafka consumer plugin reads from Kafka and creates metrics using one of the supported input data formats.

For old kafka version (< 0.8), please use the kafka_consumer_legacy input plugin and use the old zookeeper connection method.

Configuration

# Read metrics from Kafka topics
[[inputs.kafka_consumer]]
  ## Kafka brokers.
  brokers = ["localhost:9092"]

  ## Topics to consume.
  topics = ["telegraf"]

  ## When set this tag will be added to all metrics with the topic as the value.
  # topic_tag = ""

  ## Optional Client id
  # client_id = "Telegraf"

  ## Set the minimal supported Kafka version.  Setting this enables the use of new
  ## Kafka features and APIs.  Must be 0.10.2.0 or greater.
  ##   ex: version = "1.1.0"
  # version = ""

  ## Optional TLS Config
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

  ## SASL authentication credentials.  These settings should typically be used
  ## with TLS encryption enabled
  # sasl_username = "kafka"
  # sasl_password = "secret"

  ## Optional SASL:
  ## one of: OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI
  ## (defaults to PLAIN)
  # sasl_mechanism = ""

  ## used if sasl_mechanism is GSSAPI (experimental)
  # sasl_gssapi_service_name = ""
  # ## One of: KRB5_USER_AUTH and KRB5_KEYTAB_AUTH
  # sasl_gssapi_auth_type = "KRB5_USER_AUTH"
  # sasl_gssapi_kerberos_config_path = "/"
  # sasl_gssapi_realm = "realm"
  # sasl_gssapi_key_tab_path = ""
  # sasl_gssapi_disable_pafxfast = false

  ## used if sasl_mechanism is OAUTHBEARER (experimental)
  # sasl_access_token = ""

  ## SASL protocol version.  When connecting to Azure EventHub set to 0.
  # sasl_version = 1

  # Disable Kafka metadata full fetch
  # metadata_full = false

  ## Name of the consumer group.
  # consumer_group = "telegraf_metrics_consumers"

  ## Compression codec represents the various compression codecs recognized by
  ## Kafka in messages.
  ##  0 : None
  ##  1 : Gzip
  ##  2 : Snappy
  ##  3 : LZ4
  ##  4 : ZSTD
  # compression_codec = 0
  ## Initial offset position; one of "oldest" or "newest".
  # offset = "oldest"

  ## Consumer group partition assignment strategy; one of "range", "roundrobin" or "sticky".
  # balance_strategy = "range"

  ## Maximum length of a message to consume, in bytes (default 0/unlimited);
  ## larger messages are dropped
  max_message_len = 1000000

  ## Maximum messages to read from the broker that have not been written by an
  ## output.  For best throughput set based on the number of metrics within
  ## each message and the size of the output's metric_batch_size.
  ##
  ## For example, if each message from the queue contains 10 metrics and the
  ## output metric_batch_size is 1000, setting this to 100 will ensure that a
  ## full batch is collected and the write is triggered immediately without
  ## waiting until the next flush_interval.
  # max_undelivered_messages = 1000

  ## Maximum amount of time the consumer should take to process messages. If
  ## the debug log prints messages from sarama about 'abandoning subscription
  ## to [topic] because consuming was taking too long', increase this value to
  ## longer than the time taken by the output plugin(s).
  ##
  ## Note that the effective timeout could be between 'max_processing_time' and
  ## '2 * max_processing_time'.
  # max_processing_time = "100ms"

  ## The default number of message bytes to fetch from the broker in each
  ## request (default 1MB). This should be larger than the majority of
  ## your messages, or else the consumer will spend a lot of time
  ## negotiating sizes and not actually consuming. Similar to the JVM's
  ## `fetch.message.max.bytes`.
  # consumer_fetch_default = "1MB"

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "influx"

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumerGroup added in v1.14.0

type ConsumerGroup interface {
	Consume(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error
	Errors() <-chan error
	Close() error
}

type ConsumerGroupCreator added in v1.14.0

type ConsumerGroupCreator interface {
	Create(brokers []string, group string, cfg *sarama.Config) (ConsumerGroup, error)
}

type ConsumerGroupHandler added in v1.14.0

type ConsumerGroupHandler struct {
	MaxMessageLen int
	TopicTag      string
	// contains filtered or unexported fields
}

ConsumerGroupHandler is a sarama.ConsumerGroupHandler implementation.

func NewConsumerGroupHandler added in v1.14.0

func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, parser parsers.Parser, log telegraf.Logger) *ConsumerGroupHandler

func (*ConsumerGroupHandler) Cleanup added in v1.14.0

Cleanup stops the internal goroutine and is called after all ConsumeClaim functions have completed.

func (*ConsumerGroupHandler) ConsumeClaim added in v1.14.0

ConsumeClaim is called once each claim in a goroutine and must be thread-safe. Should run until the claim is closed.

func (*ConsumerGroupHandler) Handle added in v1.14.0

Handle processes a message and if successful saves it to be acknowledged after delivery.

func (*ConsumerGroupHandler) Reserve added in v1.14.0

func (h *ConsumerGroupHandler) Reserve(ctx context.Context) error

Reserve blocks until there is an available slot for a new message.

func (*ConsumerGroupHandler) Setup added in v1.14.0

Setup is called once when a new session is opened. It setups up the handler and begins processing delivered messages.

type KafkaConsumer added in v1.14.0

type KafkaConsumer struct {
	Brokers                []string        `toml:"brokers"`
	ConsumerGroup          string          `toml:"consumer_group"`
	MaxMessageLen          int             `toml:"max_message_len"`
	MaxUndeliveredMessages int             `toml:"max_undelivered_messages"`
	MaxProcessingTime      config.Duration `toml:"max_processing_time"`
	Offset                 string          `toml:"offset"`
	BalanceStrategy        string          `toml:"balance_strategy"`
	Topics                 []string        `toml:"topics"`
	TopicTag               string          `toml:"topic_tag"`
	ConsumerFetchDefault   config.Size     `toml:"consumer_fetch_default"`

	kafka.ReadConfig

	Log telegraf.Logger `toml:"-"`

	ConsumerCreator ConsumerGroupCreator `toml:"-"`
	// contains filtered or unexported fields
}

func (*KafkaConsumer) Gather added in v1.14.0

func (k *KafkaConsumer) Gather(_ telegraf.Accumulator) error

func (*KafkaConsumer) Init added in v1.14.0

func (k *KafkaConsumer) Init() error

func (*KafkaConsumer) SampleConfig added in v1.14.0

func (*KafkaConsumer) SampleConfig() string

func (*KafkaConsumer) SetParser added in v1.14.0

func (k *KafkaConsumer) SetParser(parser parsers.Parser)

func (*KafkaConsumer) Start added in v1.14.0

func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error

func (*KafkaConsumer) Stop added in v1.14.0

func (k *KafkaConsumer) Stop()

type Message added in v1.14.0

type Message struct {
	// contains filtered or unexported fields
}

Message is an aggregate type binding the Kafka message and the session so that offsets can be updated.

type SaramaCreator added in v1.14.0

type SaramaCreator struct{}

func (*SaramaCreator) Create added in v1.14.0

func (*SaramaCreator) Create(brokers []string, group string, cfg *sarama.Config) (ConsumerGroup, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL