pulsar

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 31, 2022 License: GPL-3.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TraceKey                = "trace-key"
	TagPulsarMQTopic        = "pulsar.topic"
	TagPulsarMQPayload      = "pulsar.payload"
	TagPulsarMQBatchPayload = "pulsar.batch.payload"
	TagPulsarMQConsumerMode = "pulsar.consumer.mode"

	// ForTopicTimeout ForTopic最大执行时常
	ForTopicTimeout time.Duration = 5 * time.Second

	DefaultFlowPeriodSecond = 60
	DefaultFlowPermit       = 10
)

Variables

View Source
var ErrNoPartitionProducer = errors.New("no partition producer")
View Source
var MessageHandleDuration = prometheus.NewHistogramVec(
	prometheus.HistogramOpts{
		Name:    "pulsar_message_request_duration_seconds",
		Help:    "pulsar message request duration distribution",
		Buckets: []float64{0.005, 0.01, 0.015, 0.02, 0.03, 1},
	},
	[]string{"topic"},
)

Histogram类型指标,bucket代表duration的分布区间

Functions

func SetTracer

func SetTracer(t *go2sky.Tracer)

Types

type AuthProvider

type AuthProvider interface {
	AuthMethod() string
	AuthData() []byte
}

type Client

type Client interface {
	NewConsumer(config ConsumerConfig) (Consumer, error)
	NewProducer(config ProducerConfig) (Producer, error)
	NewProducers(config ProducersConfig) (Producers, error)
}

func NewClient

func NewClient(cfg ClientConfig) Client

type ClientConfig

type ClientConfig struct {
	PulsarAddr            string
	Auth                  AuthProvider
	TlsInsecureSkipVerify bool
}

type Consumer

type Consumer interface {
	ReceiveAndHandle(ctx context.Context, handler PayloadHandler)
	Close() error
}

type ConsumerConfig

type ConsumerConfig struct {
	Topic        string
	Subscription string
	Wg           *sync.WaitGroup
	// 一般不需要设置,默认为128。模拟消息堆积时,可以设置小一些
	QueueSize int
}

type ConsumerList

type ConsumerList struct {
	FlowPeriodSecond int
	FlowPermit       uint32
	// contains filtered or unexported fields
}

func (*ConsumerList) Close

func (l *ConsumerList) Close() error

func (*ConsumerList) CronFlow

func (l *ConsumerList) CronFlow()

func (*ConsumerList) ReceiveAndHandle

func (l *ConsumerList) ReceiveAndHandle(ctx context.Context, handler PayloadHandler)

type Message

type Message = msg.Message

type NoAuth

type NoAuth struct {
}

func (*NoAuth) AuthData

func (a *NoAuth) AuthData() []byte

func (*NoAuth) AuthMethod

func (a *NoAuth) AuthMethod() string

type PayloadHandler

type PayloadHandler interface {
	HandlePayload(ctx context.Context, msg *Message, payload []byte) error
}

type Producer

type Producer interface {
	Publish(ctx context.Context, payload []byte) error
}

type ProducerConfig

type ProducerConfig struct {
	Topic string
	// 发送消息,失败重试次数,默认为3
	PublishRetry int
	// send 超时时间 默认为5s
	PublishTimeout time.Duration
}

type Producers

type Producers interface {
	Publish(ctx context.Context, payload []byte, partition uint32) error
	Partition() int
}

type ProducersConfig

type ProducersConfig struct {
	Topic string
	// 发送消息,失败重试次数,默认为3
	PublishRetry int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL