kafkaconsumer

package
v0.1.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2023 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type KafkaConsumer

type KafkaConsumer struct {
	fbcontext.ContextAware
	// contains filtered or unexported fields
}

KafkaConsumer is a firebolt source that receives records from a Kafka topic.

func (*KafkaConsumer) GetMetrics

func (k *KafkaConsumer) GetMetrics() *Metrics

GetMetrics returns the instance of ConsumerMetrics used by this kafkaconsumer.

func (*KafkaConsumer) Receive

func (k *KafkaConsumer) Receive(msg fbcontext.Message) error

Receive handles a message from another node or an external source

func (*KafkaConsumer) Setup

func (k *KafkaConsumer) Setup(config map[string]string, eventchan chan firebolt.Event) error

Setup instantiates and configures the underlying Kafka consumer client

func (*KafkaConsumer) Shutdown

func (k *KafkaConsumer) Shutdown() error

Shutdown stops the Kafka consumer client

func (*KafkaConsumer) Start

func (k *KafkaConsumer) Start() error

Start subscribes the Kafka consumer client to the configured topic and starts reading records from the consumer's channel

type Metrics

type Metrics struct {
	EventsConsumed     *prometheus.CounterVec
	StoredOffset       *prometheus.GaugeVec
	LowWatermark       *prometheus.GaugeVec
	HighWatermark      *prometheus.GaugeVec
	ConsumerLag        *prometheus.GaugeVec
	ConsumerLagStored  *prometheus.GaugeVec
	RecoveryEvents     *prometheus.CounterVec
	RecoveryRemaining  *prometheus.GaugeVec
	RecoveryPartitions prometheus.Gauge
}

Metrics encapsulates the prometheus collectors used to record metrics about the consumer

func (*Metrics) RegisterConsumerMetrics

func (m *Metrics) RegisterConsumerMetrics()

RegisterConsumerMetrics initializes gauges for tracking consumer state and registers them with the prometheus client

func (*Metrics) UpdateConsumerMetrics

func (m *Metrics) UpdateConsumerMetrics(statsJSON string, topic string)

UpdateConsumerMetrics takes the JSON stats reported by the librdkafka consumer and parses it then updates the prometheus gauges.

type PartitionStats

type PartitionStats struct {
	// contains filtered or unexported fields
}

PartitionStats is a struct for holding the statistics emitted by the librdkafka consumer that underlies confluent-kafka-go after they are parsed from their original JSON https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md#partitions

type RecoveryConsumer

type RecoveryConsumer struct {
	// contains filtered or unexported fields
}

RecoveryConsumer is a rate-limited consumer that reprocesses any records that were missed during an outage. Whenever parallel recover is enabled, if KafkaConsumer is initially assigned a partition with high watermark too far ahead (as defined by config 'maxpartitionlag') there will be a gap of missed records between the old stored offset and (highwatermark - maxpartitionlag). KafkaConsumer will request recovery for that partition, those requests are managed by RecoveryTracker. The kafkaconsumer's partition assignments, which are auto-balanced, drive the recoveryconsumer as well. There are two orders of operation for starting recovery for a partition: *Initial Request, Partition Assignment Exists First* kafkaconsumer is assigned a partition, detects a recovery condition, requests recovery, and the call to RequestRecovery() synchronously assigns that partition within RecoveryConsumer *Rebalance Assignment, Recovery Request Exists First* kafkaconsumer is assigned a partition, notifies recoveryconsumer by calling AssignPartitions, fetches the recovery request for the partition from recoverytracker, and assigns the partition within recoveryconsumer

Recovery is rate limited by config 'parallelrecoverymaxrate'; this represents the total number of events per second to receiver; the value should be set to ensure that downstream systems are not overwhelmed during recovery.

There are a few different types of race conditions to worry about here... * Race conditions on consumer rebalance: If one node gets partitions assigned, finds a gap between stored/highwatermark, and creates a request, then a consumer rebalance follows and the same partition assigned to another instance...

  • if that happens *before* offsets have been saved by the original assignee, a new request will be created that should match or instead have a slightly higher highwatermark (and thus toOffset), which is fine to overwrite the old recovery request
  • if that request happens *after* offsets saved, a new request should not be created because that saved offset is within 'maxpartitionlag', and the existing request is still valid
  • Race conditions within recovery, between partition assignment to recoveryconsumer and recoveryrequest availability in

recoverytracker:

  • on startup if recoveryrequests already exist, partitions may be assigned in kafkaconsumer before recoverytracker has consumed those existing recoveryrequests, so when recoverytracker *does* consume them recoveryconsumer must update; we use a ticker in recoveryconsumer that refreshes assignments from recoverytracker periodically

What about a case where a recovery request is in progress for an outage, and there's a second outage? Because offsets have been stored, if the second outage was short enough to not generate a new recovery request then the old recovery request will restart. However if the second outage was long enough to generate a new recovery request, it will clobber the old one, and that old in-progress recovery is abandoned. This shortcoming is worth addressing in a future release - we wouldn't want multiple recoveries running at the same time for a single partition, but each recovery request could be an ordered queue of requests, with request offset ranges that overlap automatically merged.

func NewRecoveryConsumer

func NewRecoveryConsumer(topic string, sendCh chan firebolt.Event, config map[string]string, metrics *Metrics, ctx fbcontext.FBContext) (*RecoveryConsumer, error)

NewRecoveryConsumer creates a RecoveryConsumer

func (*RecoveryConsumer) RefreshAssignments

func (rc *RecoveryConsumer) RefreshAssignments() error

RefreshAssignments updates the current partitions that are being recovered when it's known that the set of recoveryrequests managed by RecoveryTracker may have changed.

func (*RecoveryConsumer) RequestRecovery

func (rc *RecoveryConsumer) RequestRecovery(partitionID int32, fromOffset kafka.Offset, toOffset kafka.Offset)

RequestRecovery creates a RecoveryTask to track recovery for a single partition.

func (*RecoveryConsumer) SetAssignedPartitions

func (rc *RecoveryConsumer) SetAssignedPartitions(partitions []kafka.TopicPartition)

SetAssignedPartitions updates the slice of partitions that are currently assigned in kafkaconsumer.

func (*RecoveryConsumer) Shutdown

func (rc *RecoveryConsumer) Shutdown()

Shutdown stops the recovery consumer

type RecoveryRequest

type RecoveryRequest struct {
	PartitionID int32     `json:"partition_id"`
	FromOffset  int64     `json:"from_offset"`
	ToOffset    int64     `json:"to_offset"`
	Created     time.Time `json:"created"`
}

RecoveryRequest is a request for recovery to fill in missed data for a single partition.

type RecoveryRequests

type RecoveryRequests struct {
	Requests []*RecoveryRequest `json:"recovery_requests"`
}

RecoveryRequests is the ordered list of active recoveries for a single partition.

type RecoveryTracker

type RecoveryTracker struct {
	// contains filtered or unexported fields
}

RecoveryTracker uses a Kafka compact topic to persist requests for partition data recovery. It caches the most recent recoveryRequests

func NewRecoveryTracker

func NewRecoveryTracker(metrics *Metrics, ctx fbcontext.FBContext) (*RecoveryTracker, error)

NewRecoveryTracker creates a RecoveryTracker which uses the messaging framework to manage the recovery process

func (*RecoveryTracker) AddRecoveryRequest

func (rt *RecoveryTracker) AddRecoveryRequest(partitionID int32, fromOffset int64, toOffset int64) error

AddRecoveryRequest creates and persists a new recovery request for the passed partition

func (*RecoveryTracker) GetRecoveryRequest

func (rt *RecoveryTracker) GetRecoveryRequest(partitionID int32) *RecoveryRequest

GetRecoveryRequest returns the most recent RecoveryRequest for the partition passed. It may return 'nil' if either no RecoveryRequest has ever been created for this partition or if the most recent RecoveryRequest for the partition has been marked complete.

func (*RecoveryTracker) MarkRecoveryComplete

func (rt *RecoveryTracker) MarkRecoveryComplete(partitionID int32, toOffset int64) error

MarkRecoveryComplete sends an updated RecoveryRequest message to mark the recovery complete.

func (*RecoveryTracker) RecoveryRequestCount

func (rt *RecoveryTracker) RecoveryRequestCount() int

RecoveryRequestCount returns the number of currently tracked recovery requests.

func (*RecoveryTracker) Shutdown

func (rt *RecoveryTracker) Shutdown()

Shutdown stops the recovery tracker

func (*RecoveryTracker) UpdateRecoveryRequest

func (rt *RecoveryTracker) UpdateRecoveryRequest(partitionID int32, fromOffset int64, toOffset int64) error

UpdateRecoveryRequest persists the passed request to recover skipped data for a partition to a Kafka compact topic.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL