Documentation
¶
Index ¶
- Constants
- Variables
- func InitAsyncKafkaProducer(name string, hosts []string, config *sarama.Config) error
- func InitSyncKafkaProducer(name string, hosts []string, config *sarama.Config) error
- func KafkaMsgValueEncoder(value []byte) sarama.Encoder
- func KafkaMsgValueStrEncoder(value string) sarama.Encoder
- type AsyncProducer
- type Consumer
- type Kafka
- type KafkaMessageHandler
- type KafkaProducer
- type SyncProducer
Constants ¶
View Source
const ( // KafkaProducerConnected 生产者已连接 KafkaProducerConnected string = "connected" // KafkaProducerDisconnected 生产者已断开 KafkaProducerDisconnected string = "disconnected" // KafkaProducerClosed 生产者已关闭 KafkaProducerClosed string = "closed" DefaultKafkaAsyncProducer = "default-kafka-async-producer" DefaultKafkaSyncProducer = "default-kafka-sync-producer" )
region 常量
View Source
const ( KafkaConsumerConnected string = "connected" KafkaConsumerDisconnected string = "disconnected" )
Variables ¶
View Source
var ( ErrProducerTimeout = errors.New("push message timeout") KafkaSyncProducers = make(map[string]*SyncProducer) KafkaAsyncProducers = make(map[string]*AsyncProducer) KafkaStdLogger stdLogger )
region 定义变量
Functions ¶
func InitAsyncKafkaProducer ¶
初始化异步生产者
func InitSyncKafkaProducer ¶
初始化同步生产者
func KafkaMsgValueEncoder ¶
func KafkaMsgValueStrEncoder ¶
Types ¶
type AsyncProducer ¶
type AsyncProducer struct {
KafkaProducer
AsyncProducer *sarama.AsyncProducer
}
AsyncProducer 异步生产者
func GetKafkaAsyncProducer ¶
func GetKafkaAsyncProducer(name string) *AsyncProducer
func (*AsyncProducer) Close ¶
func (asyncProducer *AsyncProducer) Close() error
func (*AsyncProducer) Send ¶
func (asyncProducer *AsyncProducer) Send(msg *sarama.ProducerMessage) error
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func StartKafkaConsumer ¶
func StartKafkaConsumer(hosts, topics []string, groupID string, config *sarama.Config, f KafkaMessageHandler) (*Consumer, error)
StartKafkaConsumer 启动消费者
type KafkaMessageHandler ¶
type KafkaMessageHandler func(message *sarama.ConsumerMessage) (bool, error)
KafkaMessageHandler 消费者回调函数
type KafkaProducer ¶
type SyncProducer ¶
type SyncProducer struct {
KafkaProducer
SyncProducer *sarama.SyncProducer
}
SyncProducer 同步生产者
func GetKafkaSyncProducer ¶
func GetKafkaSyncProducer(name string) *SyncProducer
func (*SyncProducer) Close ¶
func (syncProducer *SyncProducer) Close() error
func (*SyncProducer) Send ¶
func (syncProducer *SyncProducer) Send(msg *sarama.ProducerMessage) (partition int32, offset int64, err error)
Send 同步发送消息到 kafka
func (*SyncProducer) SendMessages ¶
func (syncProducer *SyncProducer) SendMessages(msgs []*sarama.ProducerMessage) (errs sarama.ProducerErrors)
SendMessages 同步批量发送消息到kafka
Click to show internal directories.
Click to hide internal directories.