producerproxy

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 25, 2022 License: MIT Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Default = New()

Default 默认的kafka Producer代理对象

View Source
var DefaultOptions = Options{
	ConfigMap: kafka.ConfigMap{},
}
View Source
var ErrDeliverIsWatching = errors.New("can not set callback when deliver is watching")

ErrDeliverIsWatching 代理还未设置客户端对象

View Source
var ErrProxyAllreadySettedClient = errors.New("cannot reset producer")

ErrProxyAllreadySettedClient 代理已经设置过redis客户端对象

View Source
var ErrProxyNotYetSettedClient = errors.New("not set producer yet")

ErrProxyNotYetSettedClient 代理还未设置客户端对象

View Source
var Logger *log.Log

Functions

func AsBatchProducer

func AsBatchProducer() optparams.Option[Options]

AsBatchProducer 设置Producer为批发送模式,不推荐

func WithAcks

func WithAcks(acks int) optparams.Option[Options]

WithAcks 设置发送的消息需要kafka中多少个成员确认 params acks int kafka成员确认数量, 0表示不需要确认,-1表示需要全部成员确认,如果是其他正整数则不可以少于集群的最小成员数min.insync.replicas,默认为-1

func WithParallelCallback

func WithParallelCallback() optparams.Option[Options]

WithParallelCallback 设置callback并行执行

func WithProducerSetting

func WithProducerSetting(key string, value any) optparams.Option[Options]

WithProducerSetting 设置监听时的其他设置,具体设置可以看<https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md> 和<https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/producer.go>中`NewProducer`的说明 @params key string 设置项 @params value any 设置项的值

func WithQueueBufferingMaxDelay

func WithQueueBufferingMaxDelay(delay int) optparams.Option[Options]

WithQueueBufferingMaxDelay 设置发送消息时构建消息批传送给kafka之前等待生产者队列中的消息积累的延迟 params delay int 构建消息批传送给kafka之前等待生产者队列中的消息积累的延迟,单位ms

func WithoutConfirmDelivery

func WithoutConfirmDelivery() optparams.Option[Options]

WithoutConfirmDelivery 设置不监听确认发送成功与否

Types

type DeliveryCallback

type DeliveryCallback func(evt *kafka.Message)

type DeliveryUnknownEventCallback

type DeliveryUnknownEventCallback func(evt kafka.Event)

type Options

type Options struct {
	kafka.ConfigMap
	ParallelCallback   bool
	NotConfirmDelivery bool
}

Option 设置key行为的选项

type ProducerProxy

type ProducerProxy struct {
	*kafka.Producer
	Opt Options
	// contains filtered or unexported fields
}

ProducerProxy redis客户端的代理

func New

func New() *ProducerProxy

New 创建一个新的kafka Producer客户端代理

func (*ProducerProxy) Close

func (proxy *ProducerProxy) Close()

Close 关闭发送端

func (*ProducerProxy) DeliveredRecords

func (proxy *ProducerProxy) DeliveredRecords() int64

DeliveredRecords 查看已经发送了几条信息

func (*ProducerProxy) Init

func (proxy *ProducerProxy) Init(endpoints string, opts ...optparams.Option[Options]) error

Init 从配置条件初始化代理对象 @params endpoints string 设置etcd连接的地址端点,以`,`分隔 @params opts ...optparams.Option[Options]

func (*ProducerProxy) IsOk

func (proxy *ProducerProxy) IsOk() bool

IsOk 检查代理是否已经可用

func (*ProducerProxy) IsWatchingDeliver

func (proxy *ProducerProxy) IsWatchingDeliver() bool

IsWatchingDeliver( 检查代理是否正在监听发送情况

func (*ProducerProxy) OnDelivery

func (proxy *ProducerProxy) OnDelivery(cb ...DeliveryCallback) error

OnDelivery 注册当发未设置WithoutConfirmDelivery且发送成功时执行的回调 params cb ...DeliveryCallback 当发未设置WithoutConfirmDelivery且发送成功时执行的回调

func (*ProducerProxy) OnDeliveryError

func (proxy *ProducerProxy) OnDeliveryError(cb ...DeliveryCallback) error

OnDeliveryError 注册当发未设置WithoutConfirmDelivery且发送失败时执行的回调 params cb ...DeliveryCallback 当发未设置WithoutConfirmDelivery且发送失败时执行的回调

func (*ProducerProxy) OnDeliveryUnknownEventCallback

func (proxy *ProducerProxy) OnDeliveryUnknownEventCallback(cb ...DeliveryUnknownEventCallback) error

OnDeliveryUnknownEventCallback 注册当发未设置WithoutConfirmDelivery且监听到未知类型事件时的回调 params cb ...DeliveryUnknownEventCallback 当发未设置WithoutConfirmDelivery且监听到未知类型事件时的回调

func (*ProducerProxy) Regist

func (proxy *ProducerProxy) Regist(cb ...SetConnectCallback) error

Regist 注册回调函数,在init执行后执行回调函数 如果对象已经设置了被代理客户端则无法再注册回调函数 @params cb ...Callback 回调函数

func (*ProducerProxy) Send

func (proxy *ProducerProxy) Send(msg *kafka.Message)

func (*ProducerProxy) SendAndWait

func (proxy *ProducerProxy) SendAndWait(msg *kafka.Message) error

func (*ProducerProxy) SetConnect

func (proxy *ProducerProxy) SetConnect(cli *kafka.Producer) error

SetConnect 设置连接的客户端 @params cli *kafka.Producer 被代理

func (*ProducerProxy) StartConfirmDelivery

func (proxy *ProducerProxy) StartConfirmDelivery() error

StartConfirmDelivery 启动发送消息验收,用于确认发送成功

type SetConnectCallback

type SetConnectCallback func(cli *kafka.Producer) error

SetConnectCallback

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL