gxkafka

package
v0.3.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2020 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package gxkafka encapsulates some kafka functions based on github.com/Shopify/sarama. MOD : 2016-06-01 05:57

Package gxkafka encapsulates some kafka functions based on github.com/Shopify/sarama. MOD : 2016-06-01 18:00

Package gxkafka encapsulates some kafka functions based on github.com/Shopify/sarama. MOD: 2016-06-01 05:57

Index

Constants

View Source
const (
	OFFSETS_PROCESSING_TIMEOUT_SECONDS = 10e9
	OFFSETS_COMMIT_INTERVAL            = 10e9
)
View Source
const (
	HASH = iota + 1
	RANDOM
)

Variables

This section is empty.

Functions

func GetBrokerList

func GetBrokerList(zkHosts string) ([]string, error)

Types

type AsyncProducer

type AsyncProducer interface {
	SendMessage(topic string, key interface{}, message interface{}, metadata interface{}) error
	SendBytes(topic string, key []byte, message []byte, metadata interface{})
	Start()
	Stop()
	Terminate()
}

Producer is interface for sending messages to Kafka.

func NewAsyncProducer

func NewAsyncProducer(
	clientID string,
	brokers []string,
	partitionMethod int,
	waitForAllAck bool,
	updateMetaDataInterval int,
	compressionType sarama.CompressionCodec,
	successfulMessageCallback ProducerMessageCallback,
	errorCallback ProducerErrorCallback,
) (AsyncProducer, error)

NewAsyncProducer constructs a new AsyncProducer for give brokers addresses. @clientID should applied for sarama.validID [sarama config.go:var validID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`)] @updateMetaDataInterval is in second. to keep socket connection alive. its value shoule be less than connections.max.idle.ms. @compressionType pls note that the version of kafka should >= V0_10_0_0 if you wanna use CompressionLZ4.

type Consumer

type Consumer interface {
	Start() error
	Commit(*sarama.ConsumerMessage)
	Stop()
}

MessageCallback is a short notation of a callback function for incoming Kafka message.

func NewConsumer

func NewConsumer(
	clientID string,
	brokers []string,
	topicList []string,
	consumerGroup string,
	msgCb ConsumerMessageCallback,
	errCb ConsumerErrorCallback,
	ntfCb ConsumerNotificationCallback,
) (Consumer, error)

NewConsumer constructs a consumer. @clientID should applied for sarama.validID [sarama config.go:var validID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`)] the following explanation is deprecated.(2017-03-07) NewConsumer 之所以不能直接以brokers当做参数,是因为/wvanderbergen/kafka/consumer用到了consumer group, 各个消费者的信息要存到zk中

type ConsumerErrorCallback

type ConsumerErrorCallback func(error)

Consumer will invoke @ConsumerErrorCallback when got error

type ConsumerMessageCallback

type ConsumerMessageCallback func(msg *sarama.ConsumerMessage, preOffset int64)

Consumer will invoke @ProduceMessageCallback when got message @msg: consumer message @preOffset: @msg's previous message's offset in the same partition.

If @msg is this partition's first message, its preOffset is 0.

type ConsumerNotificationCallback

type ConsumerNotificationCallback func(*sc.Notification)

Consumer will invoke @ConsumerNotification when got notification

type Producer

type Producer interface {
	SendMessage(topic string, key interface{}, message interface{}) (int32, int64, error)
	SendBytes(topic string, key []byte, message []byte) (int32, int64, error)
	Stop()
}

Producer is interface for sending messages to Kafka.

func NewProducer

func NewProducer(
	clientID string,
	brokers []string,
	partitionMethod int,
	waitForAllAck bool,
	updateMetaDataInterval int,
	compressionType sarama.CompressionCodec,
) (Producer, error)

NewProducer constructs a new SyncProducer for give brokers addresses. @clientID should applied for sarama.validID [sarama config.go:var validID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`)] @updateMetaDataInterval is in second. to keep socket connection alive. its value shoule be less than connections.max.idle.ms. @compressionType pls note that the version of kafka should >= V0_10_0_0 if you wanna use CompressionLZ4.

type ProducerErrorCallback

type ProducerErrorCallback func(*sarama.ProducerError)

AsyncProducer will invoke @ProduceErrorCallback when got error message response

type ProducerMessageCallback

type ProducerMessageCallback func(*sarama.ProducerMessage)

AsyncProducer will invoke @ProduceMessageCallback when got sucess message response.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL