consumer

package
v2.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 3, 2022 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Overview

Package consumer is a generated GoMock package.

Index

Constants

View Source
const (
	/**
	 * <ul>
	 * Keywords:
	 * <li>{@code AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL}</li>
	 * </ul>
	 * <p/>
	 * <ul>
	 * Data type:
	 * <li>Boolean, like: TRUE, FALSE</li>
	 * <li>String, like: 'abc'</li>
	 * <li>Decimal, like: 123</li>
	 * <li>Float number, like: 3.1415</li>
	 * </ul>
	 * <p/>
	 * <ul>
	 * Grammar:
	 * <li>{@code AND, OR}</li>
	 * <li>{@code >, >=, <, <=, =}</li>
	 * <li>{@code BETWEEN A AND B}, equals to {@code >=A AND <=B}</li>
	 * <li>{@code NOT BETWEEN A AND B}, equals to {@code >B OR <A}</li>
	 * <li>{@code IN ('a', 'b')}, equals to {@code ='a' OR ='b'}, this operation only support String type.</li>
	 * <li>{@code IS NULL}, {@code IS NOT NULL}, check parameter whether is null, or not.</li>
	 * <li>{@code =TRUE}, {@code =FALSE}, check parameter whether is true, or false.</li>
	 * </ul>
	 * <p/>
	 * <p>
	 * Example:
	 * (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)
	 * </p>
	 */
	SQL92 = ExpressionType("SQL92")

	/**
	 * Only support or operation such as
	 * "tag1 || tag2 || tag3", <br>
	 * If null or * expression, meaning subscribe all.
	 */
	TAG = ExpressionType("TAG")
)
View Source
const (
	Mb = 1024 * 1024
)

Variables

This section is empty.

Functions

func AllocateByAveragely

func AllocateByAveragely(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue,
	cidAll []string) []*primitive.MessageQueue

func AllocateByAveragelyCircle

func AllocateByAveragelyCircle(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue,
	cidAll []string) []*primitive.MessageQueue

func AllocateByMachineNearby

func AllocateByMachineNearby(consumerGroup, currentCID string, mqAll []*primitive.MessageQueue,
	cidAll []string) []*primitive.MessageQueue

TODO

func IsTagType

func IsTagType(exp string) bool

func NewPullConsumer

func NewPullConsumer(options ...Option) (*defaultPullConsumer, error)

func NewPushConsumer

func NewPushConsumer(opts ...Option) (*pushConsumer, error)

Types

type AllocateStrategy

type AllocateStrategy func(string, string, []*primitive.MessageQueue, []string) []*primitive.MessageQueue

func AllocateByConfig

func AllocateByConfig(list []*primitive.MessageQueue) AllocateStrategy

func AllocateByConsistentHash

func AllocateByConsistentHash(virtualNodeCnt int) AllocateStrategy

func AllocateByMachineRoom

func AllocateByMachineRoom(consumeridcs []string) AllocateStrategy

type ConsumeFromWhere

type ConsumeFromWhere int

Consuming point on consumer booting. </p>

There are three consuming points: <ul> <li> <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it stopped previously. If it were a newly booting up consumer client, according aging of the consumer group, there are two cases: <ol> <li> if the consumer group is created so recently that the earliest message being subscribed has yet expired, which means the consumer group represents a lately launched business, consuming will start from the very beginning; </li> <li> if the earliest message being subscribed has expired, consuming will start from the latest messages, meaning messages born prior to the booting timestamp would be ignored. </li> </ol> </li> <li> <code>CONSUME_FROM_FIRST_OFFSET</code>: Consumer client will start from earliest messages available. </li> <li> <code>CONSUME_FROM_TIMESTAMP</code>: Consumer client will start from specified timestamp, which means messages born prior to {@link #consumeTimestamp} will be ignored </li> </ul>

const (
	ConsumeFromLastOffset ConsumeFromWhere = iota
	ConsumeFromFirstOffset
	ConsumeFromTimestamp
)

type ConsumeResult

type ConsumeResult int
const (
	ConsumeSuccess ConsumeResult = iota
	ConsumeRetryLater
	Commit
	Rollback
	SuspendCurrentQueueAMoment
)

type ConsumeResultHolder

type ConsumeResultHolder struct {
	ConsumeResult
}

type ConsumeStatus

type ConsumeStatus struct {
	PullRT            float64
	PullTPS           float64
	ConsumeRT         float64
	ConsumeOKTPS      float64
	ConsumeFailedTPS  float64
	ConsumeFailedMsgs int64
}

type ConsumeType

type ConsumeType string

type ConsumerReturn

type ConsumerReturn int
const (
	SuccessReturn ConsumerReturn = iota
	ExceptionReturn
	NullReturn
	TimeoutReturn
	FailedReturn
)

type ExpressionType

type ExpressionType string

type MessageModel

type MessageModel int

Message model defines the way how messages are delivered to each consumer clients. </p>

RocketMQ supports two message models: clustering and broadcasting. If clustering is set, consumer clients with the same {@link #ConsumerGroup} would only consume shards of the messages subscribed, which achieves load balances; Conversely, if the broadcasting is set, each consumer client will consume all subscribed messages separately. </p>

This field defaults to clustering.

const (
	BroadCasting MessageModel = iota
	Clustering
)

func (MessageModel) String

func (mode MessageModel) String() string

type MessageQueueKey

type MessageQueueKey primitive.MessageQueue

func (MessageQueueKey) MarshalText

func (mq MessageQueueKey) MarshalText() (text []byte, err error)

func (*MessageQueueKey) UnmarshalText

func (mq *MessageQueueKey) UnmarshalText(text []byte) error

type MessageSelector

type MessageSelector struct {
	Type       ExpressionType
	Expression string
}

type MockOffsetStore

type MockOffsetStore struct {
	// contains filtered or unexported fields
}

MockOffsetStore is a mock of OffsetStore interface.

func NewMockOffsetStore

func NewMockOffsetStore(ctrl *gomock.Controller) *MockOffsetStore

NewMockOffsetStore creates a new mock instance.

func (*MockOffsetStore) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

type MockOffsetStoreMockRecorder

type MockOffsetStoreMockRecorder struct {
	// contains filtered or unexported fields
}

MockOffsetStoreMockRecorder is the mock recorder for MockOffsetStore.

type OffsetSerializeWrapper

type OffsetSerializeWrapper struct {
	OffsetTable map[MessageQueueKey]int64 `json:"offsetTable"`
}

type OffsetStore

type OffsetStore interface {
	// contains filtered or unexported methods
}

func NewLocalFileOffsetStore

func NewLocalFileOffsetStore(clientID, group string) OffsetStore

func NewRemoteOffsetStore

func NewRemoteOffsetStore(group string, client internal.RMQClient, namesrv internal.Namesrvs) OffsetStore

type Option

type Option func(*consumerOptions)

func WithAutoCommit

func WithAutoCommit(auto bool) Option

func WithConsumeConcurrentlyMaxSpan

func WithConsumeConcurrentlyMaxSpan(consumeConcurrentlyMaxSpan int) Option

func WithConsumeFromWhere

func WithConsumeFromWhere(w ConsumeFromWhere) Option

func WithConsumeMessageBatchMaxSize

func WithConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize int) Option

func WithConsumeTimestamp

func WithConsumeTimestamp(consumeTimestamp string) Option

func WithConsumerModel

func WithConsumerModel(m MessageModel) Option

func WithConsumerOrder

func WithConsumerOrder(order bool) Option

func WithConsumerPullTimeout

func WithConsumerPullTimeout(consumerPullTimeout time.Duration) Option

func WithCredentials

func WithCredentials(c primitive.Credentials) Option

func WithGroupName

func WithGroupName(group string) Option

WithGroupName set group name address

func WithInstance

func WithInstance(name string) Option

func WithInterceptor

func WithInterceptor(fs ...primitive.Interceptor) Option

WithChainConsumerInterceptor returns a ConsumerOption that specifies the chained interceptor for consumer. The first interceptor will be the outer most, while the last interceptor will be the inner most wrapper around the real call.

func WithMaxReconsumeTimes

func WithMaxReconsumeTimes(times int32) Option

WithMaxReconsumeTimes set MaxReconsumeTimes of options, if message reconsume greater than MaxReconsumeTimes, it will be sent to retry or dlq topic. more info reference by examples/consumer/retry.

func WithNameServer

func WithNameServer(nameServers primitive.NamesrvAddr) Option

WithNameServer set NameServer address, only support one NameServer cluster in alpha2

func WithNameServerDomain

func WithNameServerDomain(nameServerUrl string) Option

WithNameServerDomain set NameServer domain

func WithNamespace

func WithNamespace(namespace string) Option

WithNamespace set the namespace of consumer

func WithNsResolver

func WithNsResolver(resolver primitive.NsResolver) Option

WithNsResolver set nameserver resolver to fetch nameserver addr

func WithPullBatchSize

func WithPullBatchSize(batchSize int32) Option

func WithPullInterval

func WithPullInterval(interval time.Duration) Option

func WithRebalanceLockInterval

func WithRebalanceLockInterval(interval time.Duration) Option

func WithRetry

func WithRetry(retries int) Option

WithRetry return a Option that specifies the retry times when send failed. TODO: use retry middleware instead

func WithStrategy

func WithStrategy(strategy AllocateStrategy) Option

func WithSuspendCurrentQueueTimeMillis

func WithSuspendCurrentQueueTimeMillis(suspendT time.Duration) Option

func WithTrace

func WithTrace(traceCfg *primitive.TraceConfig) Option

WithTrace support rocketmq trace: https://github.com/apache/rocketmq/wiki/RIP-6-Message-Trace.

func WithVIPChannel

func WithVIPChannel(enable bool) Option

type PullConsumer

type PullConsumer interface {
	// Start
	Start()

	// Shutdown refuse all new pull operation, finish all submitted.
	Shutdown()

	// Pull pull message of topic,  selector indicate which queue to pull.
	Pull(ctx context.Context, topic string, selector MessageSelector, numbers int) (*primitive.PullResult, error)

	// PullFrom pull messages of queue from the offset to offset + numbers
	PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)

	// updateOffset update offset of queue in mem
	UpdateOffset(queue *primitive.MessageQueue, offset int64) error

	// PersistOffset persist all offset in mem.
	PersistOffset(ctx context.Context) error

	// CurrentOffset return the current offset of queue in mem.
	CurrentOffset(queue *primitive.MessageQueue) (int64, error)
}

type PullRequest

type PullRequest struct {
	// contains filtered or unexported fields
}

func (*PullRequest) String

func (pr *PullRequest) String() string

type PushConsumerCallback

type PushConsumerCallback struct {
	// contains filtered or unexported fields
}

func (PushConsumerCallback) UniqueID

func (callback PushConsumerCallback) UniqueID() string

type QueueLock

type QueueLock struct {
	// contains filtered or unexported fields
}

type StatsManager

type StatsManager struct {
	// contains filtered or unexported fields
}

func NewStatsManager

func NewStatsManager() *StatsManager

func (*StatsManager) GetConsumeStatus

func (mgr *StatsManager) GetConsumeStatus(group, topic string) ConsumeStatus

func (*StatsManager) ShutDownStat

func (mgr *StatsManager) ShutDownStat()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL