kafka

package
v1.2.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2020 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewMsg

func NewMsg(topic string, value interface{}) (msg *sarama.ProducerMessage, err error)

kafka消息构造函数

Types

type AsyncProducer

type AsyncProducer struct {
	sarama.AsyncProducer
}

异步生产者

func NewAsyncProducer

func NewAsyncProducer(brokers []string, config *sarama.Config) *AsyncProducer

新建异步生产者

func (*AsyncProducer) SendJSON

func (p *AsyncProducer) SendJSON(topic string, value interface{}) (result interface{}, err error)

异步生产者发送单条消息

func (*AsyncProducer) SendJSONs

func (p *AsyncProducer) SendJSONs(messages []*mq.ProducerMessage) (err error)

异步生产者批量发送消息

type Consumer

type Consumer struct {
	*cluster.Consumer
	// contains filtered or unexported fields
}

消费者对象

func NewConsumer

func NewConsumer(groupID string, topics []string) *Consumer

新建消费者

func (*Consumer) BindJSONChan

func (c *Consumer) BindJSONChan(channel interface{})

将消息输出绑定到指定管道上,此方法内会进行反序列化,输出的消息类型可以是指定对象类型

func (*Consumer) BytesMessages

func (c *Consumer) BytesMessages() <-chan []byte

消息读取管道,管道消息类型是byte切片

func (*Consumer) GetLogger

func (c *Consumer) GetLogger() *logger.Logger

获取当前日志记录器

func (*Consumer) SetLogger

func (c *Consumer) SetLogger(log *logger.Logger)

将底层类库的日志输出到指定日志记录器

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

生产者对象

func NewProducer

func NewProducer() *Producer

新建生产者

func (*Producer) Close

func (p *Producer) Close() error

关闭

func (*Producer) GetLogger

func (p *Producer) GetLogger() *logger.Logger

获取当前日志记录器

func (*Producer) SendJSON

func (p *Producer) SendJSON(topic string, value interface{}) (result interface{}, err error)

发送单条消息

func (*Producer) SendJSONs

func (p *Producer) SendJSONs(messages []*mq.ProducerMessage) (err error)

批量发送消息

func (*Producer) SetLogger

func (p *Producer) SetLogger(log *logger.Logger)

将底层类库的日志输出到指定日志记录器

type SyncProducer

type SyncProducer struct {
	sarama.SyncProducer
}

同步生产者

func NewSyncProducer

func NewSyncProducer(brokers []string, config *sarama.Config) *SyncProducer

新建同步生产者

func (*SyncProducer) SendJSON

func (p *SyncProducer) SendJSON(topic string, value interface{}) (result interface{}, err error)

同步生产者发送单条消息

func (*SyncProducer) SendJSONs

func (p *SyncProducer) SendJSONs(messages []*mq.ProducerMessage) (err error)

同步生产者批量发送消息

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL