consumer

package
v1.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2024 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Overview

Package consumer contains interface for any consumer that is able to process messages. It also contains implementation of various Kafka consumers.

For more information please see: https://redhatinsights.github.io/insights-data-schemas/external-pipeline/ccx_data_pipeline.html

Package consumer contains interface for any consumer that is able to process messages. It also contains implementation of Kafka consumer.

It is expected that consumed messages are generated by ccx-data-pipeline based on OCP rules framework. The report generated by the framework are enhanced with more context information taken from different sources, like the organization ID, account number, unique cluster name, and the LastChecked timestamp (taken from the incoming Kafka record containing the URL to the archive).

It is also expected that consumed messages contains one INFO rule hit that contains cluster version. That rule hit is produced by special rule used only in external data pipeline: "version_info|CLUSTER_VERSION_INFO"

Package consumer contains interface for any consumer that is able to process messages. It also contains implementation of Kafka consumer.

It is expected that consumed messages are generated by ccx-data-pipeline based on OCP rules framework. The report generated by the framework are enhanced with more context information taken from different sources, like the organization ID, account number, unique cluster name, and the LastChecked timestamp (taken from the incoming Kafka record containing the URL to the archive).

It is also expected that consumed messages contains one INFO rule hit that contains cluster version. That rule hit is produced by special rule used only in external data pipeline: "version_info|CLUSTER_VERSION_INFO"

Package consumer contains interface for any consumer that is able to process messages. It also contains implementation of Kafka consumer.

It is expected that consumed messages are generated by ccx-data-pipeline based on OCP rules framework. The report generated by the framework are enhanced with more context information taken from different sources, like the organization ID, account number, unique cluster name, and the LastChecked timestamp (taken from the incoming Kafka record containing the URL to the archive).

It is also expected that consumed messages contains one INFO rule hit that contains cluster version. That rule hit is produced by special rule used only in external data pipeline: "version_info|CLUSTER_VERSION_INFO"

Index

Constants

This section is empty.

Variables

View Source
var DefaultSaramaConfig *sarama.Config

DefaultSaramaConfig is a config which will be used by default here you can use specific version of a protocol for example useful for testing

Functions

func DecompressMessage added in v1.4.0

func DecompressMessage(messageValue []byte) ([]byte, error)

DecompressMessage will try to decompress the message if the message is compressed by using any supported method (GZIP at this moment)

func IsMessageInGzipFormat added in v1.4.0

func IsMessageInGzipFormat(messageValue []byte) bool

IsMessageInGzipFormat function checks if the format of the message is gzip if it is it will return true if not it will return false

Types

type AttributeChecker added in v1.4.0

type AttributeChecker interface {
	IsEmpty() bool
}

AttributeChecker is an interface for checking if an attribute is empty.

type Consumer

type Consumer interface {
	Serve()
	Close() error
	HandleMessage(msg *sarama.ConsumerMessage) error
}

Consumer represents any consumer of insights-rules messages

type DVORulesProcessor added in v1.4.0

type DVORulesProcessor struct {
}

DVORulesProcessor satisfies MessageProcessor interface

type DvoMetrics added in v1.4.0

type DvoMetrics map[string]*json.RawMessage

DvoMetrics represents DVO workload recommendations received as part of the incoming message

type JSONAttributeChecker added in v1.4.0

type JSONAttributeChecker struct {
	// contains filtered or unexported fields
}

JSONAttributeChecker is an implementation of Checker for JSON data.

func (*JSONAttributeChecker) IsEmpty added in v1.4.0

func (j *JSONAttributeChecker) IsEmpty() bool

IsEmpty returns whether the data of the JSON data is empty

type KafkaConsumer

type KafkaConsumer struct {
	Configuration    broker.Configuration
	ConsumerGroup    sarama.ConsumerGroup
	Storage          storage.Storage
	MessageProcessor MessageProcessor
	// contains filtered or unexported fields
}

KafkaConsumer is an implementation of Consumer interface Example:

KafkaConsumer, err := consumer.NewKafkaConsumer(brokerCfg, storage)

if err != nil {
    panic(err)
}

KafkaConsumer.Serve()

err := KafkaConsumer.Stop()

if err != nil {
    panic(err)
}

func NewDVORulesConsumer added in v1.4.0

func NewDVORulesConsumer(brokerCfg broker.Configuration, storage storage.DVORecommendationsStorage) (*KafkaConsumer, error)

NewDVORulesConsumer constructs new implementation of Consumer interface

func NewKafkaConsumer added in v1.4.0

func NewKafkaConsumer(brokerCfg broker.Configuration, storage storage.Storage, processor MessageProcessor) (*KafkaConsumer, error)

NewKafkaConsumer constructs new implementation of Consumer interface

func NewKafkaConsumerWithSaramaConfig added in v1.4.0

func NewKafkaConsumerWithSaramaConfig(
	brokerCfg broker.Configuration,
	storage storage.Storage,
	saramaConfig *sarama.Config,
	processor MessageProcessor,
) (*KafkaConsumer, error)

NewKafkaConsumerWithSaramaConfig constructs new implementation of Consumer interface with custom sarama config

func NewOCPRulesConsumer added in v1.4.0

func NewOCPRulesConsumer(brokerCfg broker.Configuration, storage storage.OCPRecommendationsStorage) (*KafkaConsumer, error)

NewOCPRulesConsumer constructs new implementation of Consumer interface

func (*KafkaConsumer) Cleanup

func (consumer *KafkaConsumer) Cleanup(sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*KafkaConsumer) Close

func (consumer *KafkaConsumer) Close() error

Close method closes all resources used by consumer

func (*KafkaConsumer) ConsumeClaim

func (consumer *KafkaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim starts a consumer loop of ConsumerGroupClaim's Messages().

func (*KafkaConsumer) GetNumberOfErrorsConsumingMessages

func (consumer *KafkaConsumer) GetNumberOfErrorsConsumingMessages() uint64

GetNumberOfErrorsConsumingMessages returns number of errors during consuming messages since creating KafkaConsumer obj

func (*KafkaConsumer) GetNumberOfSuccessfullyConsumedMessages

func (consumer *KafkaConsumer) GetNumberOfSuccessfullyConsumedMessages() uint64

GetNumberOfSuccessfullyConsumedMessages returns number of consumed messages since creating KafkaConsumer obj

func (*KafkaConsumer) HandleMessage

func (consumer *KafkaConsumer) HandleMessage(msg *sarama.ConsumerMessage) error

HandleMessage handles the message and does all logging, metrics, etc.

Log message is written for every step made during processing, but in order to reduce amount of messages sent to ElasticSearch, most messages are produced only when log level is set to DEBUG.

A typical example which log messages are produced w/o DEBUG log level during processing:

1:26PM INF started processing message message_timestamp=2023-07-26T13:26:54+02:00 offset=7 partition=0 topic=ccx.ocp.results 1:26PM INF Consumed group=aggregator offset=7 topic=ccx.ocp.results 1:26PM INF Read cluster=5d5892d3-1f74-4ccf-91af-548dfc9767aa offset=7 organization=11789772 partition=0 request ID=missing topic=ccx.ocp.results version=2 1:26PM WRN Received data with unexpected version. cluster=5d5892d3-1f74-4ccf-91af-548dfc9767aa offset=7 organization=11789772 partition=0 topic=ccx.ocp.results version=2 1:26PM INF Stored info report cluster=5d5892d3-1f74-4ccf-91af-548dfc9767aa offset=7 organization=11789772 partition=0 request ID=missing topic=ccx.ocp.results version=2 1:26PM WRN request ID is missing, null or empty Operation=TrackPayload 1:26PM INF Message consumed duration=3 offset=7

When log level is set to DEBUG, many log messages useful for debugging are generated as well:

2:53PM INF started processing message message_timestamp=2023-07-26T14:53:32+02:00 offset=8 partition=0 topic=ccx.ocp.results 2:53PM INF Consumed group=aggregator offset=8 topic=ccx.ocp.results 2:53PM INF Read cluster=5d5892d3-1f74-4ccf-91af-548dfc9767aa offset=8 organization=11789772 partition=0 request ID=missing topic=ccx.ocp.results version=2 2:53PM WRN Received data with unexpected version. cluster=5d5892d3-1f74-4ccf-91af-548dfc9767aa offset=8 organization=11789772 partition=0 topic=ccx.ocp.results version=2 2:53PM DBG Organization allow listing disabled cluster=5d5892d3-1f74-4ccf-91af-548dfc9767aa offset=8 organization=11789772 partition=0 request ID=missing topic=ccx.ocp.results version=2 2:53PM DBG Marshalled cluster=5d5892d3-1f74-4ccf-91af-548dfc9767aa offset=8 organization=11789772 partition=0 request ID=missing topic=ccx.ocp.results version=2 2:53PM DBG Time ok cluster=5d5892d3-1f74-4ccf-91af-548dfc9767aa offset=8 organization=11789772 partition=0 request ID=missing topic=ccx.ocp.results version=2 2:53PM DBG Stored report cluster=5d5892d3-1f74-4ccf-91af-548dfc9767aa offset=8 organization=11789772 partition=0 request ID=missing topic=ccx.ocp.results version=2 2:53PM DBG Stored recommendations cluster=5d5892d3-1f74-4ccf-91af-548dfc9767aa offset=8 organization=11789772 partition=0 request ID=missing topic=ccx.ocp.results version=2 2:53PM DBG rule hits for 11789772.5d5892d3-1f74-4ccf-91af-548dfc9767aa (request ID missing):

rule: ccx_rules_ocp.external.rules.nodes_requirements_check.report; error key: NODES_MINIMUM_REQUIREMENTS_NOT_MET
rule: ccx_rules_ocp.external.bug_rules.bug_1766907.report; error key: BUGZILLA_BUG_1766907
rule: ccx_rules_ocp.external.rules.nodes_kubelet_version_check.report; error key: NODE_KUBELET_VERSION
rule: ccx_rules_ocp.external.rules.samples_op_failed_image_import_check.report; error key: SAMPLES_FAILED_IMAGE_IMPORT_ERR
rule: ccx_rules_ocp.external.rules.cluster_wide_proxy_auth_check.report; error key: AUTH_OPERATOR_PROXY_ERROR

2:53PM DBG rule hits for 11789772.5d5892d3-1f74-4ccf-91af-548dfc9767aa (request ID missing):

rule: ccx_rules_ocp.external.rules.nodes_requirements_check.report; error key: NODES_MINIMUM_REQUIREMENTS_NOT_MET
rule: ccx_rules_ocp.external.bug_rules.bug_1766907.report; error key: BUGZILLA_BUG_1766907
rule: ccx_rules_ocp.external.rules.nodes_kubelet_version_check.report; error key: NODE_KUBELET_VERSION
rule: ccx_rules_ocp.external.rules.samples_op_failed_image_import_check.report; error key: SAMPLES_FAILED_IMAGE_IMPORT_ERR
rule: ccx_rules_ocp.external.rules.cluster_wide_proxy_auth_check.report; error key: AUTH_OPERATOR_PROXY_ERROR

2:53PM INF Stored info report cluster=5d5892d3-1f74-4ccf-91af-548dfc9767aa offset=8 organization=11789772 partition=0 request ID=missing topic=ccx.ocp.results version=2 2:53PM DBG read duration=2287 offset=8 2:53PM DBG org_filtering duration=440 offset=8 2:53PM DBG marshalling duration=2023 offset=8 2:53PM DBG time_check duration=120 offset=8 2:53PM DBG db_store_report duration=119 offset=8 2:53PM DBG db_store_recommendations duration=11 offset=8 2:53PM DBG db_store_info_report duration=102 offset=8 2:53PM WRN request ID is missing, null or empty Operation=TrackPayload 2:53PM WRN request ID is missing, null or empty Operation=TrackPayload 2:53PM DBG processing of message took '0.005895183' seconds offset=8 partition=0 topic=ccx.ocp.results 2:53PM WRN request ID is missing, null or empty Operation=TrackPayload 2:53PM INF Message consumed duration=6 offset=8

func (*KafkaConsumer) Serve

func (consumer *KafkaConsumer) Serve()

Serve starts listening for messages and processing them. It blocks current thread.

func (*KafkaConsumer) Setup

Setup is run at the beginning of a new session, before ConsumeClaim

type MessageProcessor added in v1.4.0

type MessageProcessor interface {
	// contains filtered or unexported methods
}

MessageProcessor offers the interface for processing a received message

type OCPRulesProcessor added in v1.4.0

type OCPRulesProcessor struct {
}

OCPRulesProcessor satisfies MessageProcessor interface

type Report

type Report map[string]*json.RawMessage

Report represents report sent in a message consumed from any broker

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL