kafka

package module
v0.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 14, 2023 License: Apache-2.0 Imports: 18 Imported by: 0

README

kafka-client

kafka-client is a high level wrapped kafka API for kafka consuming and producing based on kafka-go

Features

Kafka consumer
  • Customized Message Handler
  • Multi topics Consumption(customized get topic function)
  • Concurrent workerpool for consuming messages(flexible workerpool size)
  • Consume error collection(when there is too many error, consumer will cancel itself)
  • Consumption frequency detect(reconnect kafka when message frequency is too low)
  • Safe manual close
  • Asynchronous cancellations and timeouts using Go contexts
  • Custom logger and verbose option
Kafka producer
  • Asynchronously / Synchronously write message
  • Safe manual close
  • Asynchronous cancellations and timeouts using Go contexts
  • Custom logger and verbose option

Project Structure

project-structure

Documentation

Overview

Package kafka Manage Kafka Client

@update 2023-03-28 02:01:25

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoTopics GetTopicsFunc got no topic
	//	@update 2023-03-14 01:18:01
	ErrNoTopics = errors.New("no topics found")

	// ErrTooManyConsumeError consumer.ErrCount reach MaxConsumeErrorCount
	//	@update 2023-03-15 02:02:27
	ErrTooManyConsumeError = errors.New("too many errors when consuming data")

	// ErrClosedConsumer error when try to close closed consumer
	//	@update 2023-03-15 02:03:09
	ErrClosedConsumer = errors.New("the consumer has been closed")

	// ErrClosedProducer error when try to close closed producer
	//	@update 2023-03-30 05:12:28
	ErrClosedProducer = errors.New("the producer has been closed")
)

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	ConsumerConfig
	// contains filtered or unexported fields
}

Consumer struct holds data related to the consumer

@author kevineluo
@update 2023-02-24 01:53:37

func NewConsumer

func NewConsumer(ctx context.Context, config ConsumerConfig) (c *Consumer, err error)

NewConsumer creates a new Kafka consumer.

@param kafkaBootstrap string
@param groupID string
@param getTopics GetTopicsFunc
@return c *Consumer
@return err error
@author kevineluo
@update 2023-03-14 01:12:16

func (*Consumer) CheckState added in v0.0.4

func (consumer *Consumer) CheckState()

CheckState check consumer state

@receiver consumer *Consumer
@author kevineluo
@update 2023-04-18 01:34:40

func (*Consumer) Close

func (consumer *Consumer) Close() error

Close manually close the consumer

@receiver consumer *Consumer
@author kevineluo
@update 2023-03-15 01:52:18

func (*Consumer) Closed added in v0.0.2

func (consumer *Consumer) Closed() <-chan struct{}

Closed provide consumer.context.Done() to check if consumer is closed

@receiver consumer *Consumer
@return bool
@author kevineluo
@update 2023-03-30 05:11:33

type ConsumerConfig

type ConsumerConfig struct {
	// the Kafka broker address, the default value is "localhost:9092".
	// If you have multiple brokers, you can use a comma to separate them, such as "localhost:9092,localhost:9093,localhost:9094".
	// default: "localhost:9092"
	Bootstrap string `json:"bootstrap,omitempty"`

	// Group ID of consumer
	GroupID string `json:"group_id,omitempty"`

	// If no message received after [MaxMsgInterval] seconds then restart Consumer, default: 300 seconds
	MaxMsgInterval time.Duration `json:"max_msg_interval,omitempty"`

	// Interval for consumer to sync topics, default: 15 seconds
	SyncTopicInterval time.Duration `json:"sync_topic_interval,omitempty"`

	// Maximum number of goroutine for subscribing to topics, default: runtime.NumCPU()
	MaxConsumeGoroutines int `json:"max_consume_goroutines,omitempty"`

	// max error count from consuming messages, set it to -1 to ignore error, default: 5
	MaxConsumeErrorCount int `json:"max_consume_error_count,omitempty"`

	// function which handles received messages from the Kafka broker
	MessageHandler MessageHandler `json:"-"`

	// Function used to sync topics, default: GetAllTopic
	GetTopics GetTopicsFunc `json:"-"`

	// logger implement logr.LogSinker, default: zapr.Logger
	Logger *logr.Logger `json:"-"`

	// used when Config.logger is nil, follow the zap style level(https://pkg.go.dev/go.uber.org/zap@v1.24.0/zapcore#Level),
	// setting the log level for zapr.Logger(config.logLevel should be in range[-1, 5])
	// default: 0 -- InfoLevel
	LogLevel int `json:"log_level,omitempty"`

	// enable verbose kafka-go log, default: false
	Verbose bool `json:"verbose,omitempty"`
}

ConsumerConfig configuration object used to create new instances of Consumer

@author kevineluo
@update 2023-03-15 03:01:48

func (*ConsumerConfig) Validate

func (config *ConsumerConfig) Validate() (err error)

Validate check config and set default value

@receiver config *ConsumerConfig
@return err error
@author kevineluo
@update 2023-03-31 04:44:47

type GetTopicsFunc

type GetTopicsFunc func(broker string) (topics []string, err error)

GetTopicsFunc way to get needed topic(implemented and provided by user)

@return topics []string
@return err error
@author kevineluo
@update 2023-03-28 07:16:54

func GetAllTopic

func GetAllTopic() GetTopicsFunc

GetAllTopic function decorator for get all topics, return GetTopicsFunc

@param kafkaBootstrap string
@return GetTopicsFunc
@author kevineluo
@update 2023-03-15 03:14:57

func GetTopicReMatch

func GetTopicReMatch(reList []string) GetTopicsFunc

GetTopicReMatch function decorator for get topics with regex match, return GetTopicsFunc matches found (resTopics) and an err if applicable.

@param reList []string
@return GetTopicsFunc
@author kevineluo
@update 2023-03-29 03:22:56

type MessageHandler

type MessageHandler func(msg *kafka.Message, consumer *Consumer) (err error)

MessageHandler function which handles received messages from the Kafka broker.

@param msg *kafka.Message
@param consumer *Consumer
@return err error
@author kevineluo
@update 2023-03-28 07:16:44

type Producer

type Producer struct {
	ProducerConfig
	// contains filtered or unexported fields
}

Producer struct holds data related to the producer

@author kevineluo
@update 2023-03-15 10:30:44

func NewProducer

func NewProducer(ctx context.Context, config ProducerConfig) (p *Producer, err error)

NewProducer creates a new Kafka producer.

@param kafkaBootstrap string
@param logrLogger logr.Logger
@return p *Producer
@return err error
@author kevineluo
@update 2023-03-15 10:52:06

NewProducer creates a new producer with the given config A producer is a wrapper of kafka writer with some additional features such as topic auto creation, message batching, etc Note that the producer is not thread-safe If you want to use one producer in multiple goroutines, you should do the synchronization by yourself

func (*Producer) Close

func (producer *Producer) Close() error

Close close the producer

@receiver producer *Producer
@return error
@author kevineluo
@update 2023-03-15 02:43:18

func (*Producer) Closed added in v0.0.2

func (producer *Producer) Closed() <-chan struct{}

Closed check if the Producer is Closed

@receiver producer *Producer
@return bool
@author kevineluo
@update 2023-03-30 05:11:40

func (*Producer) WriteMessages

func (producer *Producer) WriteMessages(ctx context.Context, msgs ...kafka.Message) (err error)

WriteMessages write messages to kafka, notice that all messages should set their own topic

@receiver producer *Producer
@param ctx context.Context
@param msgs ...kafka.Message
@return err error
@author kevineluo
@update 2023-03-29 02:46:03

type ProducerConfig

type ProducerConfig struct {
	// kafka bootstrap, default: "localhost:9092"
	Bootstrap string

	// determine synchronously / asynchronously write messages, default: false
	Async bool

	// By default kafka has the auto.create.topics.enable='true', you can ignore this config.
	// when this config, producer will attempt to create topic prior to publishing the message,
	// else it will return an error when meeting missing topic,
	// default: false
	AllowAutoTopicCreation bool

	// logger implement logr.LogSinker, default: zapr.Logger
	Logger *logr.Logger

	// used when Config.logger is nil, follow the zap style level(https://pkg.go.dev/go.uber.org/zap@v1.24.0/zapcore#Level),
	// setting the log level for zapr.Logger(config.logLevel should be in range[-1, 5])
	// default: 0 -- InfoLevel
	LogLevel int

	// enable verbose kafka-go log, default: false
	Verbose bool
}

ProducerConfig configuration object used to create new instances of Producer

@author kevineluo
@update 2023-03-15 03:01:48

func (*ProducerConfig) Validate

func (config *ProducerConfig) Validate() (err error)

Validate check config and set default value

@receiver config *ProducerConfig
@return err error
@author kevineluo
@update 2023-03-15 03:19:23

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL