Documentation
¶
Index ¶
- Variables
- func AsBatchProducer() optparams.Option[Options]
- func WithAcks(acks int) optparams.Option[Options]
- func WithParallelCallback() optparams.Option[Options]
- func WithProducerSetting(key string, value any) optparams.Option[Options]
- func WithQueueBufferingMaxDelay(delay int) optparams.Option[Options]
- func WithoutConfirmDelivery() optparams.Option[Options]
- type DeliveryCallback
- type DeliveryUnknownEventCallback
- type Options
- type ProducerProxy
- func (proxy *ProducerProxy) Close()
- func (proxy *ProducerProxy) DeliveredRecords() int64
- func (proxy *ProducerProxy) Init(endpoints string, opts ...optparams.Option[Options]) error
- func (proxy *ProducerProxy) IsOk() bool
- func (proxy *ProducerProxy) IsWatchingDeliver() bool
- func (proxy *ProducerProxy) OnDelivery(cb ...DeliveryCallback) error
- func (proxy *ProducerProxy) OnDeliveryError(cb ...DeliveryCallback) error
- func (proxy *ProducerProxy) OnDeliveryUnknownEventCallback(cb ...DeliveryUnknownEventCallback) error
- func (proxy *ProducerProxy) Regist(cb ...SetConnectCallback) error
- func (proxy *ProducerProxy) Send(msg *kafka.Message)
- func (proxy *ProducerProxy) SendAndWait(msg *kafka.Message) error
- func (proxy *ProducerProxy) SetConnect(cli *kafka.Producer) error
- func (proxy *ProducerProxy) StartConfirmDelivery() error
- type SetConnectCallback
Constants ¶
This section is empty.
Variables ¶
var Default = New()
Default 默认的kafka Producer代理对象
var DefaultOptions = Options{ ConfigMap: kafka.ConfigMap{}, }
var ErrDeliverIsWatching = errors.New("can not set callback when deliver is watching")
ErrDeliverIsWatching 代理还未设置客户端对象
var ErrProxyAllreadySettedClient = errors.New("cannot reset producer")
ErrProxyAllreadySettedClient 代理已经设置过redis客户端对象
var ErrProxyNotYetSettedClient = errors.New("not set producer yet")
ErrProxyNotYetSettedClient 代理还未设置客户端对象
var Logger *log.Log
Functions ¶
func AsBatchProducer ¶
AsBatchProducer 设置Producer为批发送模式,不推荐
func WithAcks ¶
WithAcks 设置发送的消息需要kafka中多少个成员确认 params acks int kafka成员确认数量, 0表示不需要确认,-1表示需要全部成员确认,如果是其他正整数则不可以少于集群的最小成员数min.insync.replicas,默认为-1
func WithParallelCallback ¶
WithParallelCallback 设置callback并行执行
func WithProducerSetting ¶
WithProducerSetting 设置监听时的其他设置,具体设置可以看<https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md> 和<https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/producer.go>中`NewProducer`的说明 @params key string 设置项 @params value any 设置项的值
func WithQueueBufferingMaxDelay ¶
WithQueueBufferingMaxDelay 设置发送消息时构建消息批传送给kafka之前等待生产者队列中的消息积累的延迟 params delay int 构建消息批传送给kafka之前等待生产者队列中的消息积累的延迟,单位ms
func WithoutConfirmDelivery ¶
WithoutConfirmDelivery 设置不监听确认发送成功与否
Types ¶
type DeliveryCallback ¶
type ProducerProxy ¶
ProducerProxy redis客户端的代理
func (*ProducerProxy) DeliveredRecords ¶
func (proxy *ProducerProxy) DeliveredRecords() int64
DeliveredRecords 查看已经发送了几条信息
func (*ProducerProxy) Init ¶
Init 从配置条件初始化代理对象 @params endpoints string 设置etcd连接的地址端点,以`,`分隔 @params opts ...optparams.Option[Options]
func (*ProducerProxy) IsWatchingDeliver ¶
func (proxy *ProducerProxy) IsWatchingDeliver() bool
IsWatchingDeliver( 检查代理是否正在监听发送情况
func (*ProducerProxy) OnDelivery ¶
func (proxy *ProducerProxy) OnDelivery(cb ...DeliveryCallback) error
OnDelivery 注册当发未设置WithoutConfirmDelivery且发送成功时执行的回调 params cb ...DeliveryCallback 当发未设置WithoutConfirmDelivery且发送成功时执行的回调
func (*ProducerProxy) OnDeliveryError ¶
func (proxy *ProducerProxy) OnDeliveryError(cb ...DeliveryCallback) error
OnDeliveryError 注册当发未设置WithoutConfirmDelivery且发送失败时执行的回调 params cb ...DeliveryCallback 当发未设置WithoutConfirmDelivery且发送失败时执行的回调
func (*ProducerProxy) OnDeliveryUnknownEventCallback ¶
func (proxy *ProducerProxy) OnDeliveryUnknownEventCallback(cb ...DeliveryUnknownEventCallback) error
OnDeliveryUnknownEventCallback 注册当发未设置WithoutConfirmDelivery且监听到未知类型事件时的回调 params cb ...DeliveryUnknownEventCallback 当发未设置WithoutConfirmDelivery且监听到未知类型事件时的回调
func (*ProducerProxy) Regist ¶
func (proxy *ProducerProxy) Regist(cb ...SetConnectCallback) error
Regist 注册回调函数,在init执行后执行回调函数 如果对象已经设置了被代理客户端则无法再注册回调函数 @params cb ...Callback 回调函数
func (*ProducerProxy) Send ¶
func (proxy *ProducerProxy) Send(msg *kafka.Message)
func (*ProducerProxy) SendAndWait ¶
func (proxy *ProducerProxy) SendAndWait(msg *kafka.Message) error
func (*ProducerProxy) SetConnect ¶
func (proxy *ProducerProxy) SetConnect(cli *kafka.Producer) error
SetConnect 设置连接的客户端 @params cli *kafka.Producer 被代理
func (*ProducerProxy) StartConfirmDelivery ¶
func (proxy *ProducerProxy) StartConfirmDelivery() error
StartConfirmDelivery 启动发送消息验收,用于确认发送成功
type SetConnectCallback ¶
SetConnectCallback