kafka

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 25, 2020 License: MIT Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Cluster added in v0.1.0

type Cluster struct {
	// contains filtered or unexported fields
}

Cluster is a module which connects to a Kafka Cluster and periodically fetches all topic and partition information (e. g. High & Low water marks). This information is passed to the storage module where it can be retrieved by the prometheus collector to expose metrics.

func NewCluster added in v0.1.0

func NewCluster(opts *options.Options, storageCh chan<- *StorageRequest) *Cluster

NewCluster creates a new cluster module and tries to connect to the kafka cluster If it cannot connect to the cluster it will panic

func (*Cluster) IsHealthy added in v0.1.2

func (module *Cluster) IsHealthy() bool

IsHealthy returns true if there is at least one broker which can be talked to

func (*Cluster) Start added in v0.1.0

func (module *Cluster) Start()

Start starts cluster module

type ConsumerGroupMetadata added in v0.1.2

type ConsumerGroupMetadata struct {
	Group   string
	Header  metadataHeader
	Members []metadataMember
}

ConsumerGroupMetadata contains additional information about consumer groups, such as: - Partition assignments (which hosts are assigned to partitions) - Session timeouts (hosts which haven't sent the keep alive in time) - Group rebalancing

type ConsumerPartitionOffset added in v0.1.0

type ConsumerPartitionOffset struct {
	Group     string
	Topic     string
	Partition int32
	Offset    int64
	Timestamp time.Time
}

ConsumerPartitionOffset represents a consumer group commit which can be decoded from the consumer_offsets topic

type OffsetConsumer added in v0.1.0

type OffsetConsumer struct {
	// contains filtered or unexported fields
}

OffsetConsumer is a consumer module which reads consumer group information from the offsets topic in a Kafka cluster. The offsets topic is typically named __consumer_offsets. All messages in this topic are binary and therefore they must first be decoded to access the information. This module consumes and processes all messages in the offsets topic.

func NewOffsetConsumer added in v0.1.0

func NewOffsetConsumer(opts *options.Options, storageChannel chan<- *StorageRequest) *OffsetConsumer

NewOffsetConsumer creates a consumer which process all messages in the __consumer_offsets topic If it cannot connect to the cluster it will panic

func (*OffsetConsumer) Start added in v0.1.0

func (module *OffsetConsumer) Start()

Start creates partition consumer for each partition in that topic and starts consuming them

type PartitionWaterMark added in v0.1.0

type PartitionWaterMark struct {
	TopicName   string
	PartitionID int32
	WaterMark   int64
	Timestamp   int64
}

PartitionWaterMark contains either the first or last known committed offset (water mark) for a partition

type StorageRequest added in v0.1.0

type StorageRequest struct {
	RequestType        StorageRequestType
	ConsumerOffset     *ConsumerPartitionOffset
	PartitionWaterMark *PartitionWaterMark
	TopicConfig        *TopicConfiguration
	GroupMetadata      *ConsumerGroupMetadata
	ConsumerGroupName  string
	TopicName          string
	PartitionID        int32
	PartitionCount     int
	SizeByTopic        map[string]int64
	SizeByBroker       map[int32]int64
}

StorageRequest is an entity to send messages / requests to the storage module.

type StorageRequestType added in v0.1.0

type StorageRequestType int

StorageRequestType is used to determine the message type / request when communicating with the storage module via channel. Depending on the request type you must provide additional information so that the request can be processed.

const (
	// StorageAddPartitionHighWaterMark is the request type to add a partition's high water mark
	StorageAddPartitionHighWaterMark StorageRequestType = iota + 1

	// StorageAddPartitionLowWaterMark is the request type to add a partition's low water mark
	StorageAddPartitionLowWaterMark

	// StorageAddConsumerOffset is the request type to add a consumer's offset commit
	StorageAddConsumerOffset

	// StorageAddGroupMetadata is the request type to add a group member's partition assignment
	StorageAddGroupMetadata

	// StorageAddTopicConfiguration is the request type to add configuration entries for a topic
	StorageAddTopicConfiguration

	// StorageAddSizeByTopic is the request type to add aggregated partition sizes grouped by topic
	StorageAddSizeByTopic

	// StorageAddSizeByBroker is the request type to add aggregated partition sizes grouped by broker
	StorageAddSizeByBroker

	// StorageDeleteConsumerGroup is the request type to remove an offset commit for a topic:group:partition combination
	StorageDeleteConsumerGroup

	// StorageRegisterOffsetPartitions is the request type to make the storage module aware that the offset consumer
	// first has to fully consume a specific number of partitions before it should expose any metrics
	StorageRegisterOffsetPartitions

	// StorageMarkOffsetPartitionReady is the request type to mark a partition consumer of the consumer offsets topic
	// as ready (=caught up partition lag)
	StorageMarkOffsetPartitionReady

	// StorageDeleteGroupMetadata is the request type to delete a group member's partition assignment
	StorageDeleteGroupMetadata

	// StorageDeleteTopic is the request type to delete all topic information
	StorageDeleteTopic
)

type TopicConfiguration added in v0.1.2

type TopicConfiguration struct {
	TopicName      string
	PartitionCount int
	CleanupPolicy  string
}

TopicConfiguration indicates config entries for a topic along with the partition count

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL