Documentation ¶
Index ¶
Constants ¶
View Source
const ( TraceKey = "trace-key" TagPulsarMQTopic = "pulsar.topic" TagPulsarMQPayload = "pulsar.payload" TagPulsarMQBatchPayload = "pulsar.batch.payload" TagPulsarMQConsumerMode = "pulsar.consumer.mode" // ForTopicTimeout ForTopic最大执行时常 ForTopicTimeout time.Duration = 5 * time.Second DefaultFlowPeriodSecond = 60 DefaultFlowPermit = 10 )
Variables ¶
View Source
var ErrNoPartitionProducer = errors.New("no partition producer")
View Source
var MessageHandleDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Name: "pulsar_message_request_duration_seconds", Help: "pulsar message request duration distribution", Buckets: []float64{0.005, 0.01, 0.015, 0.02, 0.03, 1}, }, []string{"topic"}, )
Histogram类型指标,bucket代表duration的分布区间
Functions ¶
Types ¶
type AuthProvider ¶
type Client ¶
type Client interface { NewConsumer(config ConsumerConfig) (Consumer, error) NewProducer(config ProducerConfig) (Producer, error) NewProducers(config ProducersConfig) (Producers, error) }
func NewClient ¶
func NewClient(cfg ClientConfig) Client
type ClientConfig ¶
type ClientConfig struct { PulsarAddr string Auth AuthProvider TlsInsecureSkipVerify bool }
type Consumer ¶
type Consumer interface { ReceiveAndHandle(ctx context.Context, handler PayloadHandler) Close() error }
type ConsumerConfig ¶
type ConsumerList ¶
type ConsumerList struct { FlowPeriodSecond int FlowPermit uint32 // contains filtered or unexported fields }
func (*ConsumerList) Close ¶
func (l *ConsumerList) Close() error
func (*ConsumerList) CronFlow ¶
func (l *ConsumerList) CronFlow()
func (*ConsumerList) ReceiveAndHandle ¶
func (l *ConsumerList) ReceiveAndHandle(ctx context.Context, handler PayloadHandler)
type PayloadHandler ¶
type ProducerConfig ¶
type ProducersConfig ¶
Click to show internal directories.
Click to hide internal directories.