kafkaconsumer

package
v0.2.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 6, 2023 License: MIT Imports: 12 Imported by: 0

README

Kafka Consumer Input Plugin

The Kafka consumer plugin reads from Kafka and creates metrics using one of the supported input data formats.

For old kafka version (< 0.8), please use the kafka_consumer_legacy input plugin and use the old zookeeper connection method.

Configuration
[[inputs.kafka_consumer]]
  instance_id = "" # unique instance identifier (REQUIRED)

  ## Kafka brokers.
  brokers = ["localhost:9092"]

  ## Topics to consume.
  topics = ["circonus"]

  ## When set this tag will be added to all metrics with the topic as the value.
  # topic_tag = ""

  ## Optional Client id
  # client_id = "circonus"

  ## Set the minimal supported Kafka version.  Setting this enables the use of new
  ## Kafka features and APIs.  Must be 0.10.2.0 or greater.
  ##   ex: version = "1.1.0"
  # version = ""

  ## Optional TLS Config
  # tls_ca = "/etc/circonus/ca.pem"
  # tls_cert = "/etc/circonus/cert.pem"
  # tls_key = "/etc/circonus/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

  ## SASL authentication credentials.  These settings should typically be used
  ## with TLS encryption enabled
  # sasl_username = "kafka"
  # sasl_password = "secret"

  ## Optional SASL:
  ## one of: OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI
  ## (defaults to PLAIN)
  # sasl_mechanism = ""

  ## used if sasl_mechanism is GSSAPI (experimental)
  # sasl_gssapi_service_name = ""
  # ## One of: KRB5_USER_AUTH and KRB5_KEYTAB_AUTH
  # sasl_gssapi_auth_type = "KRB5_USER_AUTH"
  # sasl_gssapi_kerberos_config_path = "/"
  # sasl_gssapi_realm = "realm"
  # sasl_gssapi_key_tab_path = ""
  # sasl_gssapi_disable_pafxfast = false
  ## used if sasl_mechanism is OAUTHBEARER (experimental)
  # sasl_access_token = ""

  ## SASL protocol version.  When connecting to Azure EventHub set to 0.
  # sasl_version = 1

  ## Name of the consumer group.
  # consumer_group = "circonus_metrics_consumers"

  ## Compression codec represents the various compression codecs recognized by
  ## Kafka in messages.
  ##  0 : None
  ##  1 : Gzip
  ##  2 : Snappy
  ##  3 : LZ4
  ##  4 : ZSTD
  # compression_codec = 0

  ## Initial offset position; one of "oldest" or "newest".
  # offset = "oldest"

  ## Consumer group partition assignment strategy; one of "range", "roundrobin" or "sticky".
  # balance_strategy = "range"

  ## Maximum length of a message to consume, in bytes (default 0/unlimited);
  ## larger messages are dropped
  max_message_len = 1000000

  ## Maximum messages to read from the broker that have not been written by an
  ## output.  For best throughput set based on the number of metrics within
  ## each message and the size of the output's metric_batch_size.
  ##
  ## For example, if each message from the queue contains 10 metrics and the
  ## output metric_batch_size is 1000, setting this to 100 will ensure that a
  ## full batch is collected and the write is triggered immediately without
  ## waiting until the next flush_interval.
  # max_undelivered_messages = 1000

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/circonus-labs/circonus-unified-agent/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "influx"

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumerGroup

type ConsumerGroup interface {
	Consume(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error
	Errors() <-chan error
	Close() error
}

type ConsumerGroupCreator

type ConsumerGroupCreator interface {
	Create(brokers []string, group string, config *sarama.Config) (ConsumerGroup, error)
}

type ConsumerGroupHandler

type ConsumerGroupHandler struct {
	MaxMessageLen int
	TopicTag      string
	// contains filtered or unexported fields
}

ConsumerGroupHandler is a sarama.ConsumerGroupHandler implementation.

func NewConsumerGroupHandler

func NewConsumerGroupHandler(acc cua.Accumulator, maxUndelivered int, parser parsers.Parser) *ConsumerGroupHandler

func (*ConsumerGroupHandler) Cleanup

Cleanup stops the internal goroutine and is called after all ConsumeClaim functions have completed.

func (*ConsumerGroupHandler) ConsumeClaim

ConsumeClaim is called once each claim in a goroutine and must be thread-safe. Should run until the claim is closed.

func (*ConsumerGroupHandler) Handle

Handle processes a message and if successful saves it to be acknowledged after delivery.

func (*ConsumerGroupHandler) Reserve

func (h *ConsumerGroupHandler) Reserve(ctx context.Context) error

Reserve blocks until there is an available slot for a new message.

func (*ConsumerGroupHandler) Setup

Setup is called once when a new session is opened. It setups up the handler and begins processing delivered messages.

type KafkaConsumer

type KafkaConsumer struct {
	Brokers                []string `toml:"brokers"`
	ConsumerGroup          string   `toml:"consumer_group"`
	MaxMessageLen          int      `toml:"max_message_len"`
	MaxUndeliveredMessages int      `toml:"max_undelivered_messages"`
	Offset                 string   `toml:"offset"`
	BalanceStrategy        string   `toml:"balance_strategy"`
	Topics                 []string `toml:"topics"`
	TopicTag               string   `toml:"topic_tag"`

	kafka.ReadConfig

	Log cua.Logger `toml:"-"`

	ConsumerCreator ConsumerGroupCreator `toml:"-"`
	// contains filtered or unexported fields
}

func (*KafkaConsumer) Description

func (k *KafkaConsumer) Description() string

func (*KafkaConsumer) Gather

func (*KafkaConsumer) Init

func (k *KafkaConsumer) Init() error

func (*KafkaConsumer) SampleConfig

func (k *KafkaConsumer) SampleConfig() string

func (*KafkaConsumer) SetParser

func (k *KafkaConsumer) SetParser(parser parsers.Parser)

func (*KafkaConsumer) Start

func (k *KafkaConsumer) Start(ctx context.Context, acc cua.Accumulator) error

func (*KafkaConsumer) Stop

func (k *KafkaConsumer) Stop()

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message is an aggregate type binding the Kafka message and the session so that offsets can be updated.

type SaramaCreator

type SaramaCreator struct{}

func (*SaramaCreator) Create

func (*SaramaCreator) Create(brokers []string, group string, config *sarama.Config) (ConsumerGroup, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL