kafka

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 22, 2023 License: MIT Imports: 10 Imported by: 0

README

CI pipeline codecov Go Report Card LICENSE

go-kafka-checker

An Apache Kafka checker for InVisionApp go-health.

How it works

The Apache Kafka checker implements a simple roundtrip check that sends a random message to a specific topic and checks that the message is received within the time specified in the configuration.

Installation

go get github.com/3rs4lg4d0/go-kafka-checker

Usage

// Create a new health instance.
h := health.New()

// Create a kafka check skipping the first three consumer timeouts (as maximum) if any.
kafkaCheck, err := kafka.NewKafka(kafka.KafkaConfig{
	BootstrapServers:     "localhost:19092",
	SkipConsumerTimeouts: 3,
})

You have a complete example here.

Configuration

You can configure the checker using the KafkaConfig struct.

type KafkaConfig struct {
    BootstrapServers     string
    Topic                string
    PollTimeout          time.Duration
    CheckTimeout         time.Duration
    SkipConsumerTimeouts int
    ConsumerConfig       map[string]any
    ProducerConfig       map[string]any
}
Property Type Default value Description
BootstrapServers string - Coma separated list of kafka brokers. This property is mandatory.
Topic string health-checks Topic to connect to (make sure it exists).
PollTimeout time.Duration 200 ms The maximum time spent fetching data from the topic.
CheckTimeout time.Duration 1000 ms The maximum time to wait for the check to complete.
SkipConsumerTimeouts int 0 Maximum number of check timeouts to skip at the beginning when consuming messages.
ConsumerConfig map[string]any 0 Consumer configuration (see https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md).
ProducerConfig map[string]any 0 Producer configuration (see https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md).

SkipConsumerTimeouts can be a useful property to avoid some unhealthy results because joining to a consumer group and receiving a partition assignment can take some time. So if you provide a value for this property greater than zero, for the first n checks (as maximum), if any timeout happens when consuming messages, you will see an output like this:

{
  "details": {
    "kafka-check": {
      "name": "kafka-check",
      "status": "ok",
      "fatal": true,
      "details": {
        "info": "skipped check timeout (2 remaining)"
      },
      "check_time": "2023-10-17T00:48:57.51717+02:00",
      "num_failures": 0,
      "first_failure_at": "0001-01-01T00:00:00Z"
    }
  },
  "status": "ok"
}

If another type of error happens during the first checks (e.g. a Kafka error), even if you set this property to a value greater than zero, SkipConsumerTimeouts is internally set to zero. This also happens as soon as a check completes successfully.

With ConsumerConfig and ProducerConfig you can provide specific configuration (e.g. security) but the following properties will be ignored (because they are internally set by the Kafka checker):

  • bootstrap.servers
  • group.id
  • auto.offset.reset

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumerConfig added in v1.1.0

type ConsumerConfig map[string]any

type Kafka

type Kafka struct {
	// contains filtered or unexported fields
}

Kafka implements the "ICheckable" interface.

func NewKafka

func NewKafka(cfg KafkaConfig) (*Kafka, error)

NewKafka builds a go-kafka check initialized with the provided configuration.

func (*Kafka) Status

func (k *Kafka) Status() (interface{}, error)

Status function is responsible for executing the Kafka health check. This process involves sending a random message to the configured health check topic and subsequently confirming the reception of this message by the consumer within the predefined timeout constraints.

type KafkaConfig

type KafkaConfig struct {
	BootstrapServers     string         // coma separated list of kafka brokers
	Topic                string         // topic to connect to (make sure it exists)
	PollTimeout          time.Duration  // time spent fetching the data from the topic
	CheckTimeout         time.Duration  // maximum time to wait for the check to complete
	SkipConsumerTimeouts int            // maximum number of check timeouts to skip at the beginning when consuming messages
	ConsumerConfig       ConsumerConfig // consumer configuration (see https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md)
	ProducerConfig       ProducerConfig // producer configuration (see https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md)
}

KafkaConfig is used for configuring the go-kafka check.

type ProducerConfig added in v1.1.0

type ProducerConfig map[string]any

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL