bkafka

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 11, 2022 License: MIT Imports: 9 Imported by: 0

README

bkafka

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultProduceEventHandler

func DefaultProduceEventHandler(event kafka.Event, done bool)

DefaultProduceEventHandler 默认生产者事件处理

Types

type Config

type Config map[string]interface{}

func LoadJsonConf4Consumer

func LoadJsonConf4Consumer(data []byte) (Config, error)

LoadJsonConf4Consumer 通过json格式加载consumer配置

func LoadJsonConf4Producer

func LoadJsonConf4Producer(data []byte) (Config, error)

LoadJsonConf4Producer 通过json格式加载producer配置

func (Config) ToKafkaConfig

func (c Config) ToKafkaConfig() *kafka.ConfigMap

type Consumer

type Consumer struct {
	*kafka.Consumer
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(conf Config) (*Consumer, error)

NewConsumer 实例化Consumer

func (*Consumer) GroupId

func (c *Consumer) GroupId() string

GroupId 获取groupId

func (*Consumer) HandlerEvent

func (c *Consumer) HandlerEvent(eh EventHandler)

HandlerEvent 处理事件

func (*Consumer) Total

func (c *Consumer) Total() int64

Total 获取总消费数量

type EventHandler

type EventHandler func(event kafka.Event, done bool)

EventHandler 事件处理

type Producer

type Producer struct {
	*kafka.Producer
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(conf Config, eh EventHandler) (*Producer, error)

NewProducer 实例化Producer

func (*Producer) FlushAll

func (p *Producer) FlushAll(ctx context.Context) (err error)

func (*Producer) Produce2Buffer

func (p *Producer) Produce2Buffer(topic string, value []byte, partition int32, key []byte) error

Produce2Buffer 向librdkafka写消息,错误类型包括:队列已满、超时等

func (*Producer) ProduceAsync

func (p *Producer) ProduceAsync(topic string, value []byte, partition int32, key []byte)

ProduceAsync 异步向某个topic发送消息

func (*Producer) ProduceMsg2Buffer

func (p *Producer) ProduceMsg2Buffer(topic string, value []byte) error

ProduceMsg2Buffer 向librdkafka发送消息,错误类型包括:队列已满、超时等

func (*Producer) ProduceMsgAsync

func (p *Producer) ProduceMsgAsync(topic string, value []byte)

ProduceMsgAsync 异步向某个topic发送消息

func (*Producer) ProduceOrder2Buffer

func (p *Producer) ProduceOrder2Buffer(topic string, value []byte, id int64) error

ProduceOrder2Buffer 向librdkafka顺序(同一个id会发送到同一个partition)发送消息

func (*Producer) ProduceOrderAsync

func (p *Producer) ProduceOrderAsync(topic string, value []byte, id int64)

ProduceOrderAsync 异步向某个topic顺序(同一个id会发送到同一个partition)发送消息

func (*Producer) SuccessRate

func (p *Producer) SuccessRate() float64

SuccessRate 投递成功率

func (*Producer) TotalFailed

func (p *Producer) TotalFailed() int64

TotalFailed 投递失败总量

func (*Producer) TotalSuccess

func (p *Producer) TotalSuccess() int64

TotalSuccess 投递成功总量

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL