kafka

package module
v1.8.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 10, 2023 License: MIT Imports: 26 Imported by: 0

README

Kafka Konsumer

Description

Kafka Konsumer provides an easy implementation of Kafka consumer with a built-in retry/exception manager (kafka-cronsumer).

Guide

Installation
go get github.com/Trendyol/kafka-konsumer@latest
Examples

You can find a number of ready-to-run examples at this directory.

After running docker-compose up command, you can run any application you want.

Simple Consumer
func main() {
    consumerCfg := &kafka.ConsumerConfig{
        Reader: kafka.ReaderConfig{
            Brokers: []string{"localhost:29092"},
            Topic:   "standart-topic",
            GroupID: "standart-cg",
        },
        ConsumeFn:    consumeFn,
        RetryEnabled: false,
    }

    consumer, _ := kafka.NewConsumer(consumerCfg)
    defer consumer.Stop()
    
    consumer.Consume()
}

func consumeFn(message kafka.Message) error {
    fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
    return nil
}
Simple Consumer With Retry/Exception Option
func main() {
    consumerCfg := &kafka.ConsumerConfig{
        Reader: kafka.ReaderConfig{
            Brokers: []string{"localhost:29092"},
            Topic:   "standart-topic",
            GroupID: "standart-cg",
        },
        RetryEnabled: true,
        RetryConfiguration: kafka.RetryConfiguration{
            Topic:         "retry-topic",
            StartTimeCron: "*/1 * * * *",
            WorkDuration:  50 * time.Second,
            MaxRetry:      3,
        },
        ConsumeFn: consumeFn,
    }

    consumer, _ := kafka.NewConsumer(consumerCfg)
    defer consumer.Stop()
    
    consumer.Consume()
}

func consumeFn(message kafka.Message) error {
    fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
    return nil
}
With Batch Option
func main() {
    consumerCfg := &kafka.ConsumerConfig{
        Reader: kafka.ReaderConfig{
            Brokers: []string{"localhost:29092"},
            Topic:   "standart-topic",
            GroupID: "standart-cg",
        },
        LogLevel:     kafka.LogLevelDebug,
        RetryEnabled: true,
        RetryConfiguration: kafka.RetryConfiguration{
            Brokers:       []string{"localhost:29092"},
            Topic:         "retry-topic",
            StartTimeCron: "*/1 * * * *",
            WorkDuration:  50 * time.Second,
            MaxRetry:      3,
        },
        BatchConfiguration: kafka.BatchConfiguration{
            MessageGroupLimit:    1000,
            MessageGroupDuration: time.Second,
            BatchConsumeFn:       batchConsumeFn,
        },
    }

    consumer, _ := kafka.NewConsumer(consumerCfg)
    defer consumer.Stop()

    consumer.Consume()
}

func batchConsumeFn(messages []kafka.Message) error {
    fmt.Printf("%d\n comes first %s", len(messages), messages[0].Value)
    return nil
}
With Distributed Tracing Support

Please refer to Tracing Example

With Grafana & Prometheus

In this example, we are demonstrating how to create Grafana dashboard and how to define alerts in Prometheus. You can see the example by going to the with-grafana folder in the examples folder and running the infrastructure with docker compose up and then the application.

grafana

With SASL-PLAINTEXT Authentication

Under the examples - with-sasl-plaintext folder, you can find an example of a consumer integration with SASL/PLAIN mechanism. To try the example, you can run the command docker compose up under the specified folder and then start the application.

Configurations

config description default
reader Describes all segmentio kafka reader configurations
consumeFn Kafka consumer function, if retry enabled it, is also used to consume retriable messages
logLevel Describes log level; valid options are debug, info, warn, and error info
concurrency Number of goroutines used at listeners 1
retryEnabled Retry/Exception consumer is working or not false
commitInterval indicates the interval at which offsets are committed to the broker. 1s
rack see doc
clientId see doc
dial.Timeout see doc no timeout
dial.KeepAlive see doc not enabled
transport.DialTimeout see doc 5s
transport.IdleTimeout see doc 30s
transport.MetadataTTL see doc 6s
transport.MetadataTopics see doc all topics in cluster
distributedTracingEnabled indicates open telemetry support on/off for consume and produce operations. false
distributedTracingConfiguration.TracerProvider see doc otel.GetTracerProvider()
distributedTracingConfiguration.Propagator see doc otel.GetTextMapPropagator()
retryConfiguration.clientId see doc
retryConfiguration.startTimeCron Cron expression when retry consumer (kafka-cronsumer) starts to work at
retryConfiguration.workDuration Work duration exception consumer actively consuming messages
retryConfiguration.topic Retry/Exception topic names
retryConfiguration.brokers Retry topic brokers urls
retryConfiguration.maxRetry Maximum retry value for attempting to retry a message 3
retryConfiguration.tls.rootCAPath see doc ""
retryConfiguration.tls.intermediateCAPath Same with rootCA, if you want to specify two rootca you can use it with rootCAPath ""
retryConfiguration.sasl.authType SCRAM or PLAIN
retryConfiguration.sasl.username SCRAM OR PLAIN username
retryConfiguration.sasl.password SCRAM OR PLAIN password
batchConfiguration.messageGroupLimit Maximum number of messages in a batch
batchConfiguration.messageGroupDuration Maximum time to wait for a batch
tls.rootCAPath see doc ""
tls.intermediateCAPath Same with rootCA, if you want to specify two rootca you can use it with rootCAPath ""
sasl.authType SCRAM or PLAIN
sasl.username SCRAM OR PLAIN username
sasl.password SCRAM OR PLAIN password
logger If you want to custom logger info
apiEnabled Enabled metrics false
apiConfiguration.port Set API port 8090
apiConfiguration.healtCheckPath Set Health check path healthcheck
metricConfiguration.path Set metric endpoint path /metrics

Monitoring

Kafka Konsumer offers an API that handles exposing several metrics.

Exposed Metrics
Metric Name Description Value Type
kafka_konsumer_processed_messages_total_current Total number of processed messages. Counter
kafka_konsumer_unprocessed_messages_total_current Total number of unprocessed messages. Counter
kafka_konsumer_processed_batch_messages_total_current Total number of processed batch messages. Counter
kafka_konsumer_unprocessed_batch_messages_total_current Total number of unprocessed batch messages. Counter

NOTE: kafka_konsumer_processed_batch_messages_total_current and kafka_konsumer_unprocessed_batch_messages_total_current will be deprecated in the next releases. Please use kafka_konsumer_processed_messages_total_current and kafka_konsumer_unprocessed_messages_total_current instead.

Documentation

Index

Constants

View Source
const (
	MechanismScram = "scram"
	MechanismPlain = "plain"
)
View Source
const Name = "kafka_konsumer"

Variables

This section is empty.

Functions

func NewMetricMiddleware added in v1.3.3

func NewMetricMiddleware(cfg *ConsumerConfig,
	app *fiber.App,
	consumerMetric *ConsumerMetric,
	metricCollectors ...prometheus.Collector,
) (func(ctx *fiber.Ctx) error, error)

Types

type API

type API interface {
	Start()
	Stop()
}

func NewAPI

func NewAPI(cfg *ConsumerConfig, consumerMetric *ConsumerMetric, metricCollectors ...prometheus.Collector) API

type APIConfiguration

type APIConfiguration struct {
	// Port default is 8090
	Port *int

	// HealthCheckPath default is /healthcheck
	HealthCheckPath *string
}

type BatchConfiguration

type BatchConfiguration struct {
	BatchConsumeFn       BatchConsumeFn
	MessageGroupLimit    int
	MessageGroupDuration time.Duration
}

type BatchConsumeFn

type BatchConsumeFn func([]Message) error

type ConsumeFn

type ConsumeFn func(Message) error

type Consumer

type Consumer interface {
	// Consume starts consuming
	Consume()

	// WithLogger for injecting custom log implementation
	WithLogger(logger LoggerInterface)

	// Stop for graceful shutdown. In order to avoid data loss, you have to call it!
	Stop() error
}

func NewConsumer

func NewConsumer(cfg *ConsumerConfig) (Consumer, error)

type ConsumerConfig

type ConsumerConfig struct {
	APIConfiguration                APIConfiguration
	Logger                          LoggerInterface
	MetricConfiguration             MetricConfiguration
	SASL                            *SASLConfig
	TLS                             *TLSConfig
	Dial                            *DialConfig
	BatchConfiguration              *BatchConfiguration
	ConsumeFn                       ConsumeFn
	ClientID                        string
	Rack                            string
	LogLevel                        LogLevel
	Reader                          ReaderConfig
	RetryConfiguration              RetryConfiguration
	CommitInterval                  time.Duration
	DistributedTracingEnabled       bool
	DistributedTracingConfiguration DistributedTracingConfiguration
	Concurrency                     int
	RetryEnabled                    bool
	APIEnabled                      bool
}

type ConsumerMetric added in v1.3.3

type ConsumerMetric struct {
	TotalUnprocessedMessagesCounter int64
	TotalProcessedMessagesCounter   int64
	// Deprecated
	TotalUnprocessedBatchMessagesCounter int64
	// Deprecated
	TotalProcessedBatchMessagesCounter int64
}

type DialConfig added in v1.4.6

type DialConfig struct {
	Timeout   time.Duration
	KeepAlive time.Duration
}

type Dialer

type Dialer struct {
	*kafka.Dialer
}

func (*Dialer) SetSASL

func (t *Dialer) SetSASL(mechanism sasl.Mechanism)

func (*Dialer) SetTLSConfig

func (t *Dialer) SetTLSConfig(config *tls.Config)

type DistributedTracingConfiguration added in v1.7.7

type DistributedTracingConfiguration struct {
	TracerProvider trace.TracerProvider
	Propagator     propagation.TextMapPropagator
}

type Layer

type Layer interface {
	SetTLSConfig(config *tls.Config)
	SetSASL(mechanism sasl.Mechanism)
}

type LogLevel

type LogLevel string
const (
	LogLevelDebug LogLevel = "debug"
	LogLevelInfo  LogLevel = "info"
	LogLevelWarn  LogLevel = "warn"
	LogLevelError LogLevel = "error"
)

type LoggerInterface

type LoggerInterface interface {
	// With returns a logger based off the root logger and decorates it with the given context and arguments.
	With(args ...interface{}) LoggerInterface

	// Debug uses fmt.Sprint to construct and log a message at DEBUG level
	Debug(args ...interface{})
	// Info uses fmt.Sprint to construct and log a message at INFO level
	Info(args ...interface{})
	// Warn uses fmt.Sprint to construct and log a message at ERROR level
	Warn(args ...interface{})
	// Error uses fmt.Sprint to construct and log a message at ERROR level
	Error(args ...interface{})

	// Debugf uses fmt.Sprintf to construct and log a message at DEBUG level
	Debugf(format string, args ...interface{})
	// Infof uses fmt.Sprintf to construct and log a message at INFO level
	Infof(format string, args ...interface{})
	// Warnf uses fmt.Sprintf to construct and log a message at WARN level
	Warnf(format string, args ...interface{})
	// Errorf uses fmt.Sprintf to construct and log a message at ERROR level
	Errorf(format string, args ...interface{})

	Infow(msg string, keysAndValues ...interface{})
	Errorw(msg string, keysAndValues ...interface{})
	Warnw(msg string, keysAndValues ...interface{})
}

LoggerInterface is a logger that supports log levels, context and structured logging.

func NewZapLogger

func NewZapLogger(level LogLevel) LoggerInterface

type Mechanism

type Mechanism string

type Message

type Message struct {
	Topic         string
	Partition     int
	Offset        int64
	HighWaterMark int64
	Key           []byte
	Value         []byte
	Headers       []kafka.Header
	WriterData    interface{}
	Time          time.Time
	// Context To enable distributed tracing support
	Context context.Context
}

func (*Message) AddHeader

func (m *Message) AddHeader(header ...kafka.Header)

func (*Message) Header

func (m *Message) Header(key string) *kafka.Header

func (*Message) RemoveHeader

func (m *Message) RemoveHeader(header kafka.Header)

type MetricConfiguration

type MetricConfiguration struct {
	// Path default is /metrics
	Path *string
}

type OtelKafkaKonsumerWriter added in v1.7.7

type OtelKafkaKonsumerWriter interface {
	WriteMessage(ctx context.Context, msg segmentio.Message) error
	WriteMessages(ctx context.Context, msgs []segmentio.Message) error
	Close() error
}

type Producer

type Producer interface {
	Produce(ctx context.Context, message Message) error
	ProduceBatch(ctx context.Context, messages []Message) error
	Close() error
}

func NewProducer

func NewProducer(cfg *ProducerConfig) (Producer, error)

type ProducerConfig

type ProducerConfig struct {
	Transport                       *TransportConfig
	SASL                            *SASLConfig
	TLS                             *TLSConfig
	ClientID                        string
	Writer                          WriterConfig
	DistributedTracingEnabled       bool
	DistributedTracingConfiguration DistributedTracingConfiguration
}

type Reader added in v1.6.7

type Reader interface {
	ReadMessage(ctx context.Context) (*kafka.Message, error)
	Close() error
}

func NewOtelReaderWrapper added in v1.7.7

func NewOtelReaderWrapper(cfg *ConsumerConfig, reader *segmentio.Reader) (Reader, error)

func NewReaderWrapper added in v1.6.7

func NewReaderWrapper(reader *segmentio.Reader) Reader

type ReaderConfig

type ReaderConfig kafka.ReaderConfig

type RetryConfiguration

type RetryConfiguration struct {
	SASL            *SASLConfig
	TLS             *TLSConfig
	ClientID        string
	StartTimeCron   string
	Topic           string
	DeadLetterTopic string
	Rack            string
	Brokers         []string
	MaxRetry        int
	WorkDuration    time.Duration
	LogLevel        LogLevel
}

type SASLConfig

type SASLConfig struct {
	Type     Mechanism
	Username string
	Password string
}

func (*SASLConfig) IsEmpty

func (s *SASLConfig) IsEmpty() bool

func (*SASLConfig) Mechanism

func (s *SASLConfig) Mechanism() (sasl.Mechanism, error)

type TLSConfig

type TLSConfig struct {
	RootCAPath         string
	IntermediateCAPath string
}

func (*TLSConfig) IsEmpty

func (c *TLSConfig) IsEmpty() bool

func (*TLSConfig) TLSConfig

func (c *TLSConfig) TLSConfig() (*tls.Config, error)

type Transport

type Transport struct {
	*kafka.Transport
}

func (*Transport) SetSASL

func (t *Transport) SetSASL(mechanism sasl.Mechanism)

func (*Transport) SetTLSConfig

func (t *Transport) SetTLSConfig(config *tls.Config)

type TransportConfig added in v1.4.6

type TransportConfig struct {
	DialTimeout    time.Duration
	IdleTimeout    time.Duration
	MetadataTTL    time.Duration
	MetadataTopics []string
}

type Writer added in v1.6.7

type Writer interface {
	WriteMessages(context.Context, ...kafka.Message) error
	Close() error
}

func NewOtelProducer added in v1.7.7

func NewOtelProducer(cfg *ProducerConfig, writer *segmentio.Writer) (Writer, error)

type WriterConfig

type WriterConfig struct {
	ErrorLogger            kafka.Logger
	Logger                 kafka.Logger
	Balancer               kafka.Balancer
	Completion             func(messages []kafka.Message, err error)
	Topic                  string
	Brokers                []string
	ReadTimeout            time.Duration
	BatchTimeout           time.Duration
	BatchBytes             int64
	WriteTimeout           time.Duration
	RequiredAcks           kafka.RequiredAcks
	BatchSize              int
	WriteBackoffMax        time.Duration
	WriteBackoffMin        time.Duration
	MaxAttempts            int
	Async                  bool
	Compression            kafka.Compression
	AllowAutoTopicCreation bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL