kafkatool

package
v0.0.51 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 2, 2024 License: MIT Imports: 11 Imported by: 0

README

Kafka Note Docker...

建立topic docker-compose exec broker
kafka-topics --bootstrap-server broker:9092
--create
--topic test

docker-compose exec broker
--list --zookeeper localhost:2181

docker-compose exec broker
--describe --zookeeper localhost:2181 --topic topicName

查看一個Topic 有多少個Partition kakfa-topic.sh --list topic topicName --zookeeper zookeeper.servers.list

對topic發送消息 docker-compose exec --interactive --tty broker
kafka-console-producer --bootstrap-server broker:9092
--topic topicName

aaaaaaddasdasd

閱讀topic內的消息 docker-compose exec --interactive --tty broker
kafka-console-consumer --bootstrap-server broker:9092
--topic topicName
--from-beginning

Kafka 系統的角色

  • Broker :一台kafka服務器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic
  • topic: 可以理解為一個MQ消息隊列的名字
  • Partition:為了實現擴展性,一個非常大的topic可以分佈到多個 broker(即服務器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列。 partition中的每條消息 都會被分配一個有序的id(offset)。 kafka只保證按一個partition中的順序將消息發給consumer,不保證一個topic的整體 (多個partition間)的順序。也就是說,一個topic在集群中可以有多個partition,那麼分區的策略是什麼? (消息發送到哪個分區上,有兩種基本的策略,一是採用Key Hash算法,一是採用Round Robin算法)
  • Offset:kafka的存儲文件都是按照offset.kafka來命名,用offset做名字的好處是方便查找。例如你想找位於2049的位置,只要找到2048.kafka的文件即可。當然the first offset就是00000000000.kafka
  • Producer :消息生產者,就是向kafka broker發消息的客戶端。
  • Consumer :消息消費者,向kafka broker取消息的客戶端
  • Consumer Group (CG):消息系統有兩類,一是廣播,二是訂閱發布。廣播是把消息發送給所有的消費者;發布訂閱是把消息只發送給訂閱者。 Kafka通過Consumer Group組合實現了這兩種機制: 實現一個topic消息廣播(發給所有的consumer)和單播(發給任意一個consumer)。一個 topic可以有多個CG。 topic的消息會復制(不是真的複制,是概念上的)到所有的CG,但每個CG只會把消息發給該CG中的一個 consumer(這是實現一個Topic多Consumer的關鍵點:為一個Topic定義一個CG,CG下定義多個Consumer)。如果需要實現廣播,只要每個consumer有一個獨立的CG就可以了。要實現單播只要所有的consumer在同一個CG。用CG還 可以將consumer進行自由的分組而不需要多次發送消息到不同的topic。典型的應用場景是,多個Consumer來讀取一個Topic(理想情況下是一個Consumer讀取Topic的一個Partition),那麼可以讓這些Consumer屬於同一個Consumer Group即可實現消息的多Consumer並行處理,原理是Kafka將一個消息發佈出去後,ConsumerGroup中的Consumers可以通過Round Robin的方式進行消費(Consumers之間的負載均衡使用Zookeeper來實現)

Zookeeper 在Kakfa中扮演的角色

  • kafka使用zookeeper來實現動態的集群擴展,不需要更改客戶端(producer和consumer)的配置。
  • broker會在zookeeper註冊並保持相關的元數據(topic,partition信息等)更新。
  • 而客戶端會在zookeeper上註冊相關的watcher。一旦zookeeper發生變化,客戶端能及時感知並作出相應調整。這樣就保證了添加或去除broker時,各broker間仍能自動實現負載均衡。這裡的客戶端指的是Kafka的消息生產端(Producer)和消息消費端(Consumer)
  • Broker端使用zookeeper來註冊broker信息,以及監測partition leader存活性.
  • Consumer端使用zookeeper用來註冊consumer信息,其中包括consumer消費的partition列表等,同時也用來發現broker列表,並和partition leader建立socket連接,並獲取消息.
  • Zookeer和Producer沒有建立關係,只和Brokers、Consumers建立關係以實現負載均衡,即同一個Consumer Group中的Consumers可以實現負載均衡

kafka 注意事項

Partition
  • Topic有多個Partition,消息分配到某個Partition的依據是Key Hash或者Round Robin。

  • 正常來說,每個partition 能處理的吞吐為幾MB/s(仍需要基於根據本地環境測試後獲取準確指標),增加更多的partitions意味著:

更高的並行度與吞吐 可以擴展更多的(同一個consumer group中的)consumers 若是集群中有較多的brokers,則可更大程度上利用閒置的brokers 但是會造成Zookeeper的更多選舉 也會在Kafka中打開更多的文件

調整準則:

一般來說,若是集群較小(小於6個brokers),則配置2 x broker數的partition數。在這裡主要考慮的是之後的擴展。若是集群擴展了一倍(例如12個),則不用擔心會有partition不足的現象發生 一般來說,若是集群較大(大於12個),則配置1 x broker 數的partition數。因為這裡不需要再考慮集群的擴展情況,與broker數相同的partition數已經足夠應付常規場景。若有必要,則再手動調整 考慮最高峰吞吐需要的並行consumer數,調整partition的數目。若是應用場景需要有20個(同一個consumer group中的)consumer並行消費,則據此設置為20個partition 考慮producer所需的吞吐,調整partition數目(如果producer的吞吐非常高,或是在接下來兩年內都比較高,則增加partition的數目)

Partition 只能增加不能減少
Replication Factor

此參數決定的是records複製的數目,建議至少 設置為2,一般是3,最高設置為4。更高的replication factor(假設數目為N)意味著:

系統更穩定(允許N-1個broker宕機) 更多的副本(如果acks=all,則會造成較高的延時) 系統磁盤的使用率會更高(一般若是RF為3,則相對於RF為2時,會佔據更多50% 的磁盤空間)

調整準則:

以3為起始(當然至少需要有3個brokers,同時也不建議一個Kafka 集群中節點數少於3個節點) 如果replication 性能成為了瓶頸或是一個issue,則建議使用一個性能更好的broker,而不是降低RF的數目

Replication Factor 在 prod環境中起始值,請勿少於3

慎用,得考慮資料的增長速度,避免儲存資料量太大,造成空間不足

相對的會延長kafka的寫入時間

How to Use

<!-- 建立Topic及其Partition數量 -->
	config.CreateTopic("test1", 10)

<!-- 一次寫入一大批僅有value的資料 -->
1.
	s := make([]string, 10000)
	for i := 0; i < 10000; i++ {
		s[i] = "value " + gotool.IntToStr(i)
	}
	config.WriteMessages("test3", s...)

2.
    config.WriteMessages("test3", "da", "da", "der", "ma", "te", "sen")

<!-- 一次寫入一大批的帶著key&Value資料 -->
	 m := map[string]string{}
	 for i := 0; i < 10000; i++ {
	 	m[gotool.IntToStr(i)+"@player"] = "value " + gotool.IntToStr(i)
	 }
	 config.WriteMessagesKeyValue("test1", m)

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func InitializeConsumer

func InitializeConsumer()

func InitializePublisher

func InitializePublisher()

func NewDLQReader added in v0.0.43

func NewDLQReader(hosts, topic, groupID string) *kafka.Reader

NewDLQReader - 建立NewDLQReader hosts - kafka host list, 可多組, 用逗號分開. e.g.:100.0.0.1:9092,100.0.0.2:9092

func NewReader added in v0.0.42

func NewReader(hosts, topic, groupID string) *kafka.Reader

NewReader - 建立Reader hosts - kafka host list, 可多組, 用逗號分開. e.g.:100.0.0.1:9092,100.0.0.2:9092

func NewWriter added in v0.0.42

func NewWriter(hosts, topic string) *kafka.Writer

NewWriter - 建立Writer, 可多組, 用逗號分開 e.g.:100.0.0.1:9092,100.0.0.2:9092

Types

type KafkaConfig

type KafkaConfig struct {
	Network           string   `yaml:"network"`
	Address           []string `yaml:"adress"`
	NumPartition      int      `yaml:"numPartition"`
	ReplicationFactor int      `yaml:"replicationFactor"`
	Conn              *kafka.Conn
}

KafkaConfig - Represents a Configuration

func (*KafkaConfig) CreateTopic

func (config *KafkaConfig) CreateTopic(topic string, num ...int)

CreateTopic -建立topic 1.topic 2.NumPartition 3.ReplicationFactor

func (*KafkaConfig) DelTopic

func (config *KafkaConfig) DelTopic(topic ...string)

DelTopic - 刪除Topic的列表

func (*KafkaConfig) GetTopic

func (config *KafkaConfig) GetTopic() []string

GetTopic - 取得Topic的列表

func (*KafkaConfig) ReadMessages

func (config *KafkaConfig) ReadMessages(topic, groupid string)

ReadMessages - 接收Topic的訊息

func (*KafkaConfig) WriteMessages

func (config *KafkaConfig) WriteMessages(topic string, value ...string)

WriteMessages - 發送訊息到Topic

func (*KafkaConfig) WriteMessagesKeyValue

func (config *KafkaConfig) WriteMessagesKeyValue(topic string, value map[string]string)

WriteMessagesKeyValue - 發送訊息到Topic

func (*KafkaConfig) WriteMessagesKeyValueList added in v0.0.6

func (config *KafkaConfig) WriteMessagesKeyValueList(topic string, value []WriteData) error

WriteMessagesKeyValueList - 發送訊息到Topic

type WorkerCircuitBreaker added in v0.0.48

type WorkerCircuitBreaker struct {
	// contains filtered or unexported fields
}

func NewWorkerCircuitBreaker added in v0.0.48

func NewWorkerCircuitBreaker(bucketVolume, consumeTokenPerRequest, maxDataSize int) *WorkerCircuitBreaker

NewWorkerCircuitBreaker bucketVolume: 令牌桶容量 consumeTokenPerRequest: 每次消耗令牌量 maxDataSize: 滿足此條件表示要消耗令牌 e.g. bucketVolume = 10, consumeTokenPerRequest = 3, maxDataSize = 100 表示每秒產生1個token, 但每次消耗3個token, 當資料筆數大於等於100時, 消耗三個token 當桶子內的token 不夠時, 會等待下一個token產生 用這方式降低資料庫寫入壓力

func (*WorkerCircuitBreaker) Check added in v0.0.48

func (r *WorkerCircuitBreaker) Check(ctx context.Context, dataCount int)

func (*WorkerCircuitBreaker) CheckContinuouslyWriting added in v0.0.49

func (r *WorkerCircuitBreaker) CheckContinuouslyWriting(ctx context.Context)

type WriteData added in v0.0.6

type WriteData struct {
	Key   string
	Value string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL