kafka

package module
v0.0.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2024 License: Apache-2.0 Imports: 20 Imported by: 0

README

kafka-client

kafka-client is a high level wrapped kafka API for kafka consuming and producing based on kafka-go

Features

Kafka consumer
  • Customized Message Handler
  • Multi topics Consumption(customized get topic function)
  • Concurrent workerpool for consuming messages(flexible workerpool size)
  • Consume error collection(when there is too many error, consumer will cancel itself)
  • Consumption frequency detect(reconnect kafka when message frequency is too low)
  • Safe manual close
  • Asynchronous cancellations and timeouts using Go contexts
  • Custom logger and verbose option
  • SASL authentication
  • TLS authentication
Kafka producer
  • Asynchronously / Synchronously write message
  • Safe manual close
  • Asynchronous cancellations and timeouts using Go contexts
  • Custom logger and verbose option
  • SASL authentication
  • TLS authentication

Usage

For basic usage examples, refer to the system test cases in the test directory.

To run the system tests, you need to have a Kafka broker(set auto.create.topics.enable to true, or create topics unit-test-topic-1 ... unit-test-topic-10 before starting tests) and set environment variables below:

  • KAFKA_BOOTSTRAP: kafka broker address
  • KAFKA_SASL_USERNAME(optional): kafka sasl username
  • KAFKA_SASL_PASSWORD(optional): kafka sasl password

SASL & TLS

To use SASL or TLS, you need to set Mechanism or TLS in ConsumerConfig and ProducerConfig, use ConsumerConfig as an example:

package test

import (
	"context"
	"crypto/tls"
	"log"
	"os"

	kc "github.com/Kevinello/kafka-client"
	"github.com/segmentio/kafka-go"
	"github.com/segmentio/kafka-go/sasl/scram"
)

func main() {
	mechanism, _ := scram.Mechanism(scram.SHA512, "username", "password")
	config := kc.ConsumerConfig{
		GroupID: "test-group",
		MessageHandler: func(msg *kafka.Message, consumer *kc.Consumer) (err error) {
			log.Printf("message: %s\n", string(msg.Value))
			return
		},
		Mechanism: mechanism,
		TLS:       &tls.Config{},
	}
	consumer, _ := kc.NewConsumer(context.Background(), config)
	select {
	case <-consumer.Closed():
		os.Exit(1)
	}
}

Replace the username and password with your own, and fill the TLS with your own tls config.

Project Structure

project-structure

Documentation

Overview

Package kafka Manage Kafka Client

@update 2023-03-28 02:01:25

Index

Constants

View Source
const (
	// MechanismSASLPlain SASL PLAINTEXT authentication
	//	@update 2024-01-03 06:54:55
	MechanismSASLPlain = "SASL_PLAINTEXT"

	// MechanismSASLSCRAMSHA256 SASL SCRAM-SHA-256 authentication
	//	@update 2024-01-03 06:55:11
	MechanismSASLSCRAMSHA256 = "SASL_SCRAM_SHA_256"

	// MechanismSASLSCRAMSHA512 SASL SCRAM-SHA-512 authentication
	//	@update 2024-01-03 06:55:17
	MechanismSASLSCRAMSHA512 = "SASL_SCRAM_SHA_512"
)

Mechanism SASL authentication mechanism constants

Variables

View Source
var (
	// ErrNoTopics GetTopicsFunc got no topic
	//	@update 2023-03-14 01:18:01
	ErrNoTopics = errors.New("no topics found")

	// ErrTooManyConsumeError consumer.ErrCount reach MaxConsumeErrorCount
	//	@update 2023-03-15 02:02:27
	ErrTooManyConsumeError = errors.New("too many errors when consuming data")

	// ErrClosedConsumer error when try to close closed consumer
	//	@update 2023-03-15 02:03:09
	ErrClosedConsumer = errors.New("the consumer has been closed")

	// ErrClosedProducer error when try to close closed producer
	//	@update 2023-03-30 05:12:28
	ErrClosedProducer = errors.New("the producer has been closed")
)

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	ConsumerConfig
	// contains filtered or unexported fields
}

Consumer struct holds data related to the consumer

@author kevineluo
@update 2023-02-24 01:53:37

func NewConsumer

func NewConsumer(ctx context.Context, config ConsumerConfig) (c *Consumer, err error)

NewConsumer creates a new Kafka consumer.

@param kafkaBootstrap string
@param groupID string
@param getTopics GetTopicsFunc
@return c *Consumer
@return err error
@author kevineluo
@update 2023-03-14 01:12:16

func (*Consumer) CheckState added in v0.0.4

func (consumer *Consumer) CheckState()

CheckState check consumer state

@receiver consumer *Consumer
@author kevineluo
@update 2023-04-18 01:34:40

func (*Consumer) Close

func (consumer *Consumer) Close() error

Close manually close the consumer

@receiver consumer *Consumer
@author kevineluo
@update 2023-03-15 01:52:18

func (*Consumer) Closed added in v0.0.2

func (consumer *Consumer) Closed() <-chan struct{}

Closed provide consumer.context.Done() to check if consumer is closed

@receiver consumer *Consumer
@return bool
@author kevineluo
@update 2023-03-30 05:11:33

type ConsumerConfig

type ConsumerConfig struct {
	// the Kafka broker address, the default value is "localhost:9092".
	// If you have multiple brokers, you can use a comma to separate them, such as "localhost:9092,localhost:9093,localhost:9094".
	// default: "localhost:9092"
	Bootstrap string `json:"bootstrap"`

	// Group ID of consumer
	GroupID string `json:"group_id"`

	// If no message received after [MaxMsgInterval] seconds then restart Consumer, default: 300 seconds
	MaxMsgInterval time.Duration `json:"max_msg_interval"`

	// Interval for consumer to sync topics, default: 15 seconds
	SyncTopicInterval time.Duration `json:"sync_topic_interval"`

	// Maximum number of goroutine for subscribing to topics, default: runtime.NumCPU()
	MaxConsumeGoroutines int `json:"max_consume_goroutines"`

	// max error count from consuming messages, set it to -1 to ignore error, default: 5
	MaxConsumeErrorCount int `json:"max_consume_error_count"`

	// function which handles received messages from the Kafka broker
	MessageHandler MessageHandler `json:"-"`

	// Function used to sync topics, default: GetAllTopic
	GetTopics GetTopicsFunc `json:"-"`

	// logger implement logr.LogSinker, default: zapr.Logger
	Logger *logr.Logger `json:"-"`

	// Mechanism sasl authentication, default: nil
	Mechanism sasl.Mechanism `json:"-"`

	// TLS TLS configuration, default: nil
	TLS *tls.Config `json:"-"`

	// used when Config.logger is nil, follow the zap style level(https://pkg.go.dev/go.uber.org/zap@v1.24.0/zapcore#Level),
	// setting the log level for zapr.Logger(config.logLevel should be in range[-1, 5])
	// default: 0 -- InfoLevel
	LogLevel int `json:"log_level"`

	// enable verbose kafka-go log, default: false
	Verbose bool `json:"verbose"`
}

ConsumerConfig configuration object used to create new instances of Consumer

@author kevineluo
@update 2023-03-15 03:01:48

func (*ConsumerConfig) Validate

func (config *ConsumerConfig) Validate() (err error)

Validate check config and set default value

@receiver config *ConsumerConfig
@return err error
@author kevineluo
@update 2023-03-31 04:44:47

type GetTopicsFunc

type GetTopicsFunc func(broker string, mechanism sasl.Mechanism) (topics []string, err error)

GetTopicsFunc way to get needed topic(implemented and provided by user)

@return topics []string
@return err error
@author kevineluo
@update 2023-03-28 07:16:54

func GetAllTopic

func GetAllTopic() GetTopicsFunc

GetAllTopic function decorator for get all topics, return GetTopicsFunc

@param kafkaBootstrap string
@return GetTopicsFunc
@author kevineluo
@update 2023-03-15 03:14:57

func GetTopicReMatch

func GetTopicReMatch(reList []string) GetTopicsFunc

GetTopicReMatch function decorator for get topics with regex match, return GetTopicsFunc matches found (resTopics) and an err if applicable.

@param reList []string
@return GetTopicsFunc
@author kevineluo
@update 2023-03-29 03:22:56

type MessageHandler

type MessageHandler func(msg *kafka.Message, consumer *Consumer) (err error)

MessageHandler function which handles received messages from the Kafka broker.

@param msg *kafka.Message
@param consumer *Consumer
@return err error
@author kevineluo
@update 2023-03-28 07:16:44

type Producer

type Producer struct {
	ProducerConfig
	// contains filtered or unexported fields
}

Producer struct holds data related to the producer

@author kevineluo
@update 2023-03-15 10:30:44

func NewProducer

func NewProducer(ctx context.Context, config ProducerConfig) (p *Producer, err error)

NewProducer creates a new Kafka producer.

@param kafkaBootstrap string
@param logrLogger logr.Logger
@return p *Producer
@return err error
@author kevineluo
@update 2023-03-15 10:52:06

NewProducer creates a new producer with the given config A producer is a wrapper of kafka writer with some additional features such as topic auto creation, message batching, etc Note that the producer is not thread-safe If you want to use one producer in multiple goroutines, you should do the synchronization by yourself

func (*Producer) Close

func (producer *Producer) Close() error

Close close the producer

@receiver producer *Producer
@return error
@author kevineluo
@update 2023-03-15 02:43:18

func (*Producer) Closed added in v0.0.2

func (producer *Producer) Closed() <-chan struct{}

Closed check if the Producer is Closed

@receiver producer *Producer
@return bool
@author kevineluo
@update 2023-03-30 05:11:40

func (*Producer) WriteMessages

func (producer *Producer) WriteMessages(ctx context.Context, msgs ...kafka.Message) (err error)

WriteMessages write messages to kafka, notice that all messages should set their own topic

@receiver producer *Producer
@param ctx context.Context
@param msgs ...kafka.Message
@return err error
@author kevineluo
@update 2023-03-29 02:46:03

type ProducerConfig

type ProducerConfig struct {
	// kafka bootstrap, default: "localhost:9092"
	Bootstrap string `json:"bootstrap"`

	// determine synchronously / asynchronously write messages, default: false
	Async bool `json:"async"`

	// By default kafka has the auto.create.topics.enable='true', you can ignore this config.
	// when this config, producer will attempt to create topic prior to publishing the message,
	// else it will return an error when meeting missing topic,
	// default: false
	AllowAutoTopicCreation bool `json:"allow_auto_topic_creation"`

	// logger implement logr.LogSinker, default: zapr.Logger
	Logger *logr.Logger `json:"-"`

	// Mechanism sasl authentication, default: nil
	Mechanism sasl.Mechanism `json:"-"`

	// TLS TLS configuration, default: nil
	TLS *tls.Config `json:"-"`

	// used when Config.logger is nil, follow the zap style level(https://pkg.go.dev/go.uber.org/zap@v1.24.0/zapcore#Level),
	// setting the log level for zapr.Logger(config.logLevel should be in range[-1, 5])
	// default: 0 -- InfoLevel
	LogLevel int `json:"log_level"`

	// enable verbose kafka-go log, default: false
	Verbose bool `json:"verbose"`
}

ProducerConfig configuration object used to create new instances of Producer

@author kevineluo
@update 2023-03-15 03:01:48

func (*ProducerConfig) Validate

func (config *ProducerConfig) Validate() (err error)

Validate check config and set default value

@receiver config *ProducerConfig
@return err error
@author kevineluo
@update 2023-03-15 03:19:23

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL