Version: v2.1.8 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jul 20, 2020 License: Apache-2.0 Imports: 26 Imported by: 2



Package consumer is a generated GoMock package.



View Source
const (
	 * <ul>
	 * Keywords:
	 * <li>{@code AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL}</li>
	 * </ul>
	 * <p/>
	 * <ul>
	 * Data type:
	 * <li>Boolean, like: TRUE, FALSE</li>
	 * <li>String, like: 'abc'</li>
	 * <li>Decimal, like: 123</li>
	 * <li>Float number, like: 3.1415</li>
	 * </ul>
	 * <p/>
	 * <ul>
	 * Grammar:
	 * <li>{@code AND, OR}</li>
	 * <li>{@code >, >=, <, <=, =}</li>
	 * <li>{@code BETWEEN A AND B}, equals to {@code >=A AND <=B}</li>
	 * <li>{@code NOT BETWEEN A AND B}, equals to {@code >B OR <A}</li>
	 * <li>{@code IN ('a', 'b')}, equals to {@code ='a' OR ='b'}, this operation only support String type.</li>
	 * <li>{@code IS NULL}, {@code IS NOT NULL}, check parameter whether is null, or not.</li>
	 * <li>{@code =TRUE}, {@code =FALSE}, check parameter whether is true, or false.</li>
	 * </ul>
	 * <p/>
	 * <p>
	 * Example:
	 * (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)
	 * </p>
	SQL92 = ExpressionType("SQL92")

	 * Only support or operation such as
	 * "tag1 || tag2 || tag3", <br>
	 * If null or * expression, meaning subscribe all.
	TAG = ExpressionType("TAG")
View Source
const (
	Mb = 1024 * 1024


View Source
var (
	ErrCreated        = errors.New("consumer group has been created")
	ErrBrokerNotFound = errors.New("broker can not found")


func AllocateByAveragely

func AllocateByAveragely(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue,
	cidAll []string) []*primitive.MessageQueue

func AllocateByAveragelyCircle

func AllocateByAveragelyCircle(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue,
	cidAll []string) []*primitive.MessageQueue

func AllocateByMachineNearby

func AllocateByMachineNearby(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue,
	cidAll []string) []*primitive.MessageQueue


func IsTagType

func IsTagType(exp string) bool

func NewPullConsumer

func NewPullConsumer(options ...Option) (*defaultPullConsumer, error)

func NewPushConsumer

func NewPushConsumer(opts ...Option) (*pushConsumer, error)

func ShutDownStatis

func ShutDownStatis()


type AllocateStrategy

type AllocateStrategy func(string, string, []*primitive.MessageQueue, []string) []*primitive.MessageQueue

func AllocateByConfig

func AllocateByConfig(list []*primitive.MessageQueue) AllocateStrategy

func AllocateByConsistentHash

func AllocateByConsistentHash(virtualNodeCnt int) AllocateStrategy

func AllocateByMachineRoom

func AllocateByMachineRoom(consumeridcs []string) AllocateStrategy

type ConsumeFromWhere

type ConsumeFromWhere int

Consuming point on consumer booting. </p>

There are three consuming points: <ul> <li> <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it stopped previously. If it were a newly booting up consumer client, according aging of the consumer group, there are two cases: <ol> <li> if the consumer group is created so recently that the earliest message being subscribed has yet expired, which means the consumer group represents a lately launched business, consuming will start from the very beginning; </li> <li> if the earliest message being subscribed has expired, consuming will start from the latest messages, meaning messages born prior to the booting timestamp would be ignored. </li> </ol> </li> <li> <code>CONSUME_FROM_FIRST_OFFSET</code>: Consumer client will start from earliest messages available. </li> <li> <code>CONSUME_FROM_TIMESTAMP</code>: Consumer client will start from specified timestamp, which means messages born prior to {@link #consumeTimestamp} will be ignored </li> </ul>

const (
	ConsumeFromLastOffset ConsumeFromWhere = iota

type ConsumeResult

type ConsumeResult int
const (
	ConsumeSuccess ConsumeResult = iota

type ConsumeResultHolder

type ConsumeResultHolder struct {

type ConsumeStatus

type ConsumeStatus struct {
	PullRT            float64
	PullTPS           float64
	ConsumeRT         float64
	ConsumeOKTPS      float64
	ConsumeFailedTPS  float64
	ConsumeFailedMsgs int64

func GetConsumeStatus

func GetConsumeStatus(group, topic string) ConsumeStatus

type ConsumeType

type ConsumeType string

type ConsumerReturn

type ConsumerReturn int
const (
	SuccessReturn ConsumerReturn = iota

type ExpressionType

type ExpressionType string

type MessageModel

type MessageModel int

Message model defines the way how messages are delivered to each consumer clients. </p>

RocketMQ supports two message models: clustering and broadcasting. If clustering is set, consumer clients with the same {@link #ConsumerGroup} would only consume shards of the messages subscribed, which achieves load balances; Conversely, if the broadcasting is set, each consumer client will consume all subscribed messages separately. </p>

This field defaults to clustering.

const (
	BroadCasting MessageModel = iota

func (MessageModel) String

func (mode MessageModel) String() string

type MessageQueueKey

type MessageQueueKey primitive.MessageQueue

func (MessageQueueKey) MarshalText

func (mq MessageQueueKey) MarshalText() (text []byte, err error)

func (*MessageQueueKey) UnmarshalText

func (mq *MessageQueueKey) UnmarshalText(text []byte) error

type MessageSelector

type MessageSelector struct {
	Type       ExpressionType
	Expression string

type MockOffsetStore

type MockOffsetStore struct {
	// contains filtered or unexported fields

MockOffsetStore is a mock of OffsetStore interface

func NewMockOffsetStore

func NewMockOffsetStore(ctrl *gomock.Controller) *MockOffsetStore

NewMockOffsetStore creates a new mock instance

func (*MockOffsetStore) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

type MockOffsetStoreMockRecorder

type MockOffsetStoreMockRecorder struct {
	// contains filtered or unexported fields

MockOffsetStoreMockRecorder is the mock recorder for MockOffsetStore

type OffsetSerializeWrapper

type OffsetSerializeWrapper struct {
	OffsetTable map[MessageQueueKey]int64 `json:"offsetTable"`

type OffsetStore

type OffsetStore interface {
	// contains filtered or unexported methods

func NewLocalFileOffsetStore

func NewLocalFileOffsetStore(clientID, group string) OffsetStore

func NewRemoteOffsetStore

func NewRemoteOffsetStore(group string, client internal.RMQClient, namesrv internal.Namesrvs) OffsetStore

type Option

type Option func(*consumerOptions)

func WithAutoCommit

func WithAutoCommit(auto bool) Option

func WithConsumeFromWhere

func WithConsumeFromWhere(w ConsumeFromWhere) Option

func WithConsumeMessageBatchMaxSize

func WithConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize int) Option

func WithConsumerModel

func WithConsumerModel(m MessageModel) Option

func WithConsumerOrder

func WithConsumerOrder(order bool) Option

func WithCredentials

func WithCredentials(c primitive.Credentials) Option

func WithGroupName

func WithGroupName(group string) Option

WithGroupName set group name address

func WithInstance

func WithInstance(name string) Option

func WithInterceptor

func WithInterceptor(fs ...primitive.Interceptor) Option

WithChainConsumerInterceptor returns a ConsumerOption that specifies the chained interceptor for consumer. The first interceptor will be the outer most, while the last interceptor will be the inner most wrapper around the real call.

func WithMaxReconsumeTimes

func WithMaxReconsumeTimes(times int32) Option

WithMaxReconsumeTimes set MaxReconsumeTimes of options, if message reconsume greater than MaxReconsumeTimes, it will be sent to retry or dlq topic. more info reference by examples/consumer/retry.

func WithNameServer

func WithNameServer(nameServers primitive.NamesrvAddr) Option

WithNameServer set NameServer address, only support one NameServer cluster in alpha2

func WithNameServerDomain

func WithNameServerDomain(nameServerUrl string) Option

WithNameServerDomain set NameServer domain

func WithNamespace

func WithNamespace(namespace string) Option

WithNamespace set the namespace of consumer

func WithNsResovler

func WithNsResovler(resolver primitive.NsResolver) Option

WithNsResovler set nameserver resolver to fetch nameserver addr

func WithPullBatchSize

func WithPullBatchSize(batchSize int32) Option

func WithPullInterval

func WithPullInterval(interval time.Duration) Option

func WithRebalanceLockInterval

func WithRebalanceLockInterval(interval time.Duration) Option

func WithRetry

func WithRetry(retries int) Option

WithRetry return a Option that specifies the retry times when send failed. TODO: use retry middleware instead

func WithStrategy

func WithStrategy(strategy AllocateStrategy) Option

func WithSuspendCurrentQueueTimeMillis

func WithSuspendCurrentQueueTimeMillis(suspendT time.Duration) Option

func WithTrace

func WithTrace(traceCfg *primitive.TraceConfig) Option

WithTrace support rocketmq trace:

func WithVIPChannel

func WithVIPChannel(enable bool) Option

type PullConsumer

type PullConsumer interface {
	// Start

	// Shutdown refuse all new pull operation, finish all submitted.

	// Pull pull message of topic,  selector indicate which queue to pull.
	Pull(ctx context.Context, topic string, selector MessageSelector, numbers int) (*primitive.PullResult, error)

	// PullFrom pull messages of queue from the offset to offset + numbers
	PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)

	// updateOffset update offset of queue in mem
	UpdateOffset(queue *primitive.MessageQueue, offset int64) error

	// PersistOffset persist all offset in mem.
	PersistOffset(ctx context.Context) error

	// CurrentOffset return the current offset of queue in mem.
	CurrentOffset(queue *primitive.MessageQueue) (int64, error)

type PullRequest

type PullRequest struct {
	// contains filtered or unexported fields

func (*PullRequest) String

func (pr *PullRequest) String() string

type PushConsumerCallback

type PushConsumerCallback struct {
	// contains filtered or unexported fields

func (PushConsumerCallback) UniqueID

func (callback PushConsumerCallback) UniqueID() string

type QueueLock

type QueueLock struct {
	// contains filtered or unexported fields

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to