rocketmq

package
v0.0.0-...-a89d410 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 5, 2024 License: Apache-2.0 Imports: 37 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	Addresses    []string            `json:"addr" toml:"addr"`
	PushConsumer *PushConsumerConfig `json:"consumer" toml:"consumer" mapstructure:",squash"`
	PullConsumer *PullConsumerConfig `json:"pullConsumer" toml:"pullConsumer" mapstructure:",squash"`
	Producer     *ProducerConfig     `json:"producer" toml:"producer"`
}

Config config...

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig ...

type ConsumerDefaultConfig

type ConsumerDefaultConfig struct {
	Name          string        `json:"name" toml:"name"`
	Enable        bool          `json:"enable" toml:"enable"`
	Addr          []string      `json:"addr" toml:"addr"`
	Topic         string        `json:"topic" toml:"topic"`
	Group         string        `json:"group" toml:"group"`
	DialTimeout   time.Duration `json:"dialTimeout" toml:"dialTimeout"`
	SubExpression string        `json:"subExpression" toml:"subExpression"`
	// 最大重复消费次数
	Reconsume    int32  `json:"reconsume" toml:"reconsume"`
	AccessKey    string `json:"accessKey" toml:"accessKey"`
	SecretKey    string `json:"secretKey" toml:"secretKey"`
	MessageModel string `json:"messageModel" toml:"messageModel"` // 消费模式,默认clustering
	// client实例名,默认会基于Addr字段生成md5,支持多集群
	InstanceName string `json:"instanceName" toml:"instanceName"`
	// 批量消费的最大消息数量,取值范围:[1, 1024],默认值为1
	ConsumeMessageBatchMaxSize int `json:"consumeMessageBatchMaxSize" toml:"consumeMessageBatchMaxSize"`
	// 每批次从broker拉取消息的最大个数,取值范围:[1, 1024],默认值为32
	PullBatchSize int32 `json:"pullBatchSize" toml:"pullBatchSize"`
	// 设置每次消息拉取的时间间隔,push模式最大为65535*time.Millisecond
	PullInterval time.Duration `json:"pullInterval" toml:"pullInterval"`
	// 是否开启trace
	EnableTrace bool `json:"enableTrace" toml:"enableTrace"`
}

type FlowInfo

type FlowInfo struct {
	Name      string   `json:"name"`
	Addr      []string `json:"addr"`
	Topic     string   `json:"topic"`
	Group     string   `json:"group"`
	GroupType string   `json:"groupType"` // 类型, consumer 消费者, producer 生产者
	istats.FlowInfoBase
}

type Producer

type Producer struct {
	rocketmq.Producer

	ProducerConfig
	// contains filtered or unexported fields
}

func StdNewProducer

func StdNewProducer(name string) *Producer

func (*Producer) Close

func (pc *Producer) Close() error

func (*Producer) MustStart

func (pc *Producer) MustStart()

MustStart panics when error found.

func (*Producer) SendWithContext

func (pc *Producer) SendWithContext(ctx context.Context, msg []byte) error

SendWithContext 发送消息

func (*Producer) SendWithMsg

func (pc *Producer) SendWithMsg(ctx context.Context, msg *primitive.Message) error

SendWithMsg 发送消息,可以自定义选择tag

func (*Producer) Start

func (pc *Producer) Start() error

func (*Producer) WithInterceptor

func (pc *Producer) WithInterceptor(fs ...primitive.Interceptor) *Producer

type ProducerConfig

type ProducerConfig struct {
	Name        string        `json:"name" toml:"name"`
	Addr        []string      `json:"addr" toml:"addr"`
	Topic       string        `json:"topic" toml:"topic"`
	Group       string        `json:"group" toml:"group"`
	Retry       int           `json:"retry" toml:"retry"`
	DialTimeout time.Duration `json:"dialTimeout" toml:"dialTimeout"`
	RwTimeout   time.Duration `json:"rwTimeout" toml:"rwTimeout"`
	AccessKey   string        `json:"accessKey" toml:"accessKey"`
	SecretKey   string        `json:"secretKey" toml:"secretKey"`
	// client实例名,默认会基于Addr字段生成md5,支持多集群
	InstanceName string `json:"instanceName" toml:"instanceName"`
	EnableTrace  bool   `json:"enableTrace" toml:"enableTrace"`
}

ProducerConfig producer config

func RawProducerConfig

func RawProducerConfig(name string) *ProducerConfig

RawProducerConfig 返回produce配置 nolint:dupl

func StdProducerConfig

func StdProducerConfig(name string) *ProducerConfig

StdProducerConfig ...

func (*ProducerConfig) Build

func (conf *ProducerConfig) Build() *Producer

type PullConsumer

type PullConsumer struct {
	rocketmq.PullConsumer

	PullConsumerConfig
	// contains filtered or unexported fields
}

func (*PullConsumer) Close

func (cc *PullConsumer) Close()

func (*PullConsumer) MustStart

func (cc *PullConsumer) MustStart()

MustStart panics when error found.

func (*PullConsumer) Poll

func (cc *PullConsumer) Poll(ctx context.Context, f func(context.Context, []*primitive.MessageExt) error)

func (*PullConsumer) Start

func (cc *PullConsumer) Start() error

type PullConsumerConfig

type PullConsumerConfig struct {
	ConsumerDefaultConfig
	// 持久化offset间隔
	RefreshPersistOffsetDuration time.Duration `json:"refreshPersistOffsetDuration" toml:"refreshPersistOffsetDuration"`
	PollTimeout                  time.Duration `json:"pollTimeout" toml:"pollTimeout"`
}

PullConsumerConfig pull consumer config

func RawPullConsumerConfig

func RawPullConsumerConfig(name string) *PullConsumerConfig

RawPullConsumerConfig 返回pull consume配置 nolint:dupl

func StdPullConsumerConfig

func StdPullConsumerConfig(name string) *PullConsumerConfig

StdPullConsumerConfig ...

func (*PullConsumerConfig) Build

func (conf *PullConsumerConfig) Build() *PullConsumer

type PushConsumer

type PushConsumer struct {
	rocketmq.PushConsumer

	PushConsumerConfig
	// contains filtered or unexported fields
}

func (*PushConsumer) Close

func (cc *PushConsumer) Close()

func (*PushConsumer) MustStart

func (cc *PushConsumer) MustStart()

MustStart panics when error found.

func (*PushConsumer) RegisterBatchMessage

func (cc *PushConsumer) RegisterBatchMessage(f func(context.Context, ...*primitive.MessageExt) error) *PushConsumer

func (*PushConsumer) RegisterSingleMessage

func (cc *PushConsumer) RegisterSingleMessage(f func(context.Context, *primitive.MessageExt) error) *PushConsumer

func (*PushConsumer) Start

func (cc *PushConsumer) Start() error

func (*PushConsumer) WithInterceptor

func (cc *PushConsumer) WithInterceptor(fs ...primitive.Interceptor) *PushConsumer

type PushConsumerConfig

type PushConsumerConfig struct {
	ConsumerDefaultConfig
	RwTimeout       time.Duration `json:"rwTimeout" toml:"rwTimeout"`
	Rate            float64       `json:"rate" toml:"rate"`
	Capacity        int64         `json:"capacity" toml:"capacity"`
	WaitMaxDuration time.Duration `json:"waitMaxDuration" toml:"waitMaxDuration"`
	// 消费消息的协程数,默认为20
	ConsumeGoroutineNums int `json:"consumeGoroutineNums" toml:"consumeGoroutineNums"`
}

PushConsumerConfig push consumer config

func RawPushConsumerConfig

func RawPushConsumerConfig(name string) *PushConsumerConfig

RawPushConsumerConfig 返push consume回配置 nolint:dupl

func StdPushConsumerConfig

func StdPushConsumerConfig(name string) *PushConsumerConfig

StdPushConsumerConfig ...

func (*PushConsumerConfig) Build

func (conf *PushConsumerConfig) Build() *PushConsumer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL