rocketmq

package
v1.2.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 3, 2021 License: Apache-2.0 Imports: 7 Imported by: 243

Documentation

Index

Constants

View Source
const (
	CommonProducer  = ProducerModel(1)
	OrderlyProducer = ProducerModel(2)
	TransProducer   = ProducerModel(3)
)

Different models

View Source
const (
	BroadCasting = MessageModel(1)
	Clustering   = MessageModel(2)
)

MessageModel

View Source
const (
	CoCurrently = ConsumerModel(1)
	Orderly     = ConsumerModel(2)
)

ConsumerModel

View Source
const (
	NIL                        = rmqError(C.OK)
	ErrNullPoint               = rmqError(C.NULL_POINTER)
	ErrMallocFailed            = rmqError(C.MALLOC_FAILED)
	ErrProducerStartFailed     = rmqError(C.PRODUCER_START_FAILED)
	ErrSendSyncFailed          = rmqError(C.PRODUCER_SEND_SYNC_FAILED)
	ErrSendOnewayFailed        = rmqError(C.PRODUCER_SEND_ONEWAY_FAILED)
	ErrSendOrderlyFailed       = rmqError(C.PRODUCER_SEND_ORDERLY_FAILED)
	ErrPushConsumerStartFailed = rmqError(C.PUSHCONSUMER_ERROR_CODE_START)
	ErrPullConsumerStartFailed = rmqError(C.PULLCONSUMER_START_FAILED)
	ErrFetchMQFailed           = rmqError(C.PULLCONSUMER_FETCH_MQ_FAILED)
	ErrFetchMessageFailed      = rmqError(C.PULLCONSUMER_FETCH_MESSAGE_FAILED)
)

This is error messages

View Source
const (
	LogLevelFatal = LogLevel(C.E_LOG_LEVEL_FATAL)
	LogLevelError = LogLevel(C.E_LOG_LEVEL_ERROR)
	LogLevelWarn  = LogLevel(C.E_LOG_LEVEL_WARN)
	LogLevelInfo  = LogLevel(C.E_LOG_LEVEL_INFO)
	LogLevelDebug = LogLevel(C.E_LOG_LEVEL_DEBUG)
	LogLevelTrace = LogLevel(C.E_LOG_LEVEL_TRACE)
	LogLevelNum   = LogLevel(C.E_LOG_LEVEL_LEVEL_NUM)
)

predefined log level

View Source
const (
	//SendOK OK
	SendOK = SendStatus(C.E_SEND_OK)
	//SendFlushDiskTimeout Failed because broker flush error
	SendFlushDiskTimeout = SendStatus(C.E_SEND_FLUSH_DISK_TIMEOUT)
	//SendFlushSlaveTimeout Failed because slave broker timeout
	SendFlushSlaveTimeout = SendStatus(C.E_SEND_FLUSH_SLAVE_TIMEOUT)
	//SendSlaveNotAvailable Failed because slave broker error
	SendSlaveNotAvailable = SendStatus(C.E_SEND_SLAVE_NOT_AVAILABLE)
)
View Source
const (
	PullFound         = PullStatus(C.E_FOUND)
	PullNoNewMsg      = PullStatus(C.E_NO_NEW_MSG)
	PullNoMatchedMsg  = PullStatus(C.E_NO_MATCHED_MSG)
	PullOffsetIllegal = PullStatus(C.E_OFFSET_ILLEGAL)
	PullBrokerTimeout = PullStatus(C.E_BROKER_TIMEOUT)
)

predefined pull status

View Source
const (
	//ConsumeSuccess commit offset to broker
	ConsumeSuccess = ConsumeStatus(C.E_CONSUME_SUCCESS)
	//ReConsumeLater it will be send back to broker
	ReConsumeLater = ConsumeStatus(C.E_RECONSUME_LATER)
)
View Source
const (
	CommitTransaction   = TransactionStatus(C.E_COMMIT_TRANSACTION)
	RollbackTransaction = TransactionStatus(C.E_ROLLBACK_TRANSACTION)
	UnknownTransaction  = TransactionStatus(C.E_UNKNOWN_TRANSACTION)
)
View Source
const GoClientVersion = "Go Client V1.2.4, Support CPP Core:V1.2.X"

GoClientVersion const strings for version

Variables

This section is empty.

Functions

func GetVersion

func GetVersion() (version string)

GetVersion return go version strings

func Version

func Version() (version string)

Version get go sdk version

Types

type ClientConfig

type ClientConfig struct {
	GroupID          string
	NameServer       string
	NameServerDomain string
	InstanceName     string
	Credentials      *SessionCredentials
	LogC             *LogConfig
}

ClientConfig save client config

func (*ClientConfig) String

func (config *ClientConfig) String() string

type ConsumeStatus

type ConsumeStatus int

ConsumeStatus the retern value for consumer

func (ConsumeStatus) String

func (status ConsumeStatus) String() string

type ConsumerModel

type ConsumerModel int

ConsumerModel CoCurrently or Orderly

func (ConsumerModel) String

func (mode ConsumerModel) String() string

type LogConfig

type LogConfig struct {
	Path     string
	FileNum  int
	FileSize int64
	Level    LogLevel
}

LogConfig the log configuration for the pull consumer

func (*LogConfig) String

func (lc *LogConfig) String() string

type LogLevel

type LogLevel int

LogLevel the log level

func (LogLevel) String

func (l LogLevel) String() string

type Message

type Message struct {
	Topic          string
	Tags           string
	Keys           string
	Body           string
	DelayTimeLevel int
	Property       map[string]string
	// contains filtered or unexported fields
}

Message used for send

func (*Message) GetProperty added in v1.2.4

func (msg *Message) GetProperty(key string) string

func (*Message) String

func (msg *Message) String() string

type MessageExt

type MessageExt struct {
	Message
	MessageID                 string
	QueueId                   int
	ReconsumeTimes            int
	StoreSize                 int
	BornTimestamp             int64
	StoreTimestamp            int64
	QueueOffset               int64
	CommitLogOffset           int64
	PreparedTransactionOffset int64
	// contains filtered or unexported fields
}

MessageExt used for consume

func (*MessageExt) GetProperty

func (msgExt *MessageExt) GetProperty(key string) string

GetProperty get the message property by key from message ext

func (*MessageExt) String

func (msgExt *MessageExt) String() string

type MessageModel

type MessageModel int

MessageModel Clustering or BroadCasting

func (MessageModel) String

func (mode MessageModel) String() string

type MessageQueue

type MessageQueue struct {
	Topic  string
	Broker string
	ID     int
}

MessageQueue the queue of the message

func (*MessageQueue) String

func (q *MessageQueue) String() string

type MessageQueueSelector

type MessageQueueSelector interface {
	Select(size int, m *Message, arg interface{}) int
}

MessageQueueSelector select one message queue

type Producer

type Producer interface {

	// SendMessageSync send a message with sync
	SendMessageSync(msg *Message) (*SendResult, error)

	// SendMessageOrderly send the message orderly
	SendMessageOrderly(
		msg *Message,
		selector MessageQueueSelector,
		arg interface{},
		autoRetryTimes int) (*SendResult, error)

	// SendMessageOneway send a message with oneway
	SendMessageOneway(msg *Message) error

	SendMessageOrderlyByShardingKey(msg *Message, shardingkey string) (*SendResult, error)
	// contains filtered or unexported methods
}

Producer define interface

func NewProducer

func NewProducer(config *ProducerConfig) (Producer, error)

NewProducer create a new producer with config

type ProducerConfig

type ProducerConfig struct {
	ClientConfig
	SendMsgTimeout int
	CompressLevel  int
	MaxMessageSize int
	ProducerModel  ProducerModel
}

ProducerConfig define a producer

func (*ProducerConfig) String

func (config *ProducerConfig) String() string

type ProducerModel

type ProducerModel int

ProducerModel Common or orderly

func (ProducerModel) String

func (mode ProducerModel) String() string

type PullConsumer

type PullConsumer interface {

	// Pull returns the messages from the consume queue by specify the offset and the max number
	Pull(mq MessageQueue, subExpression string, offset int64, maxNums int) PullResult

	// FetchSubscriptionMessageQueues returns the consume queue of the topic
	FetchSubscriptionMessageQueues(topic string) []MessageQueue
	// contains filtered or unexported methods
}

PullConsumer consumer pulling the message

func NewPullConsumer

func NewPullConsumer(config *PullConsumerConfig) (PullConsumer, error)

NewPullConsumer creates a pull consumer

type PullConsumerConfig

type PullConsumerConfig struct {
	ClientConfig
}

PullConsumerConfig the configuration for the pull consumer

func (*PullConsumerConfig) String

func (config *PullConsumerConfig) String() string

type PullResult

type PullResult struct {
	NextBeginOffset int64
	MinOffset       int64
	MaxOffset       int64
	Status          PullStatus
	Messages        []*MessageExt
}

PullResult the pull result

func (*PullResult) String

func (pr *PullResult) String() string

type PullStatus

type PullStatus int

PullStatus pull status

func (PullStatus) String

func (ps PullStatus) String() string

type PushConsumer

type PushConsumer interface {

	// Subscribe a new topic with specify filter expression and consume function.
	Subscribe(topic, expression string, consumeFunc func(msg *MessageExt) ConsumeStatus) error
	// contains filtered or unexported methods
}

PushConsumer apis for PushConsumer

func NewPushConsumer

func NewPushConsumer(config *PushConsumerConfig) (PushConsumer, error)

NewPushConsumer create a new consumer with config.

type PushConsumerConfig

type PushConsumerConfig struct {
	ClientConfig
	ThreadCount             int
	MessageBatchMaxSize     int
	Model                   MessageModel
	ConsumerModel           ConsumerModel
	MaxCacheMessageSize     int
	MaxCacheMessageSizeInMB int
}

PushConsumerConfig define a new consumer.

func (*PushConsumerConfig) String

func (config *PushConsumerConfig) String() string

type SendResult

type SendResult struct {
	Status SendStatus
	MsgId  string
	Offset int64
}

SendResult status for send

func (*SendResult) String

func (result *SendResult) String() string

type SendStatus

type SendStatus int

SendStatus The Status for send result from C apis.

func (SendStatus) String

func (status SendStatus) String() string

type SessionCredentials

type SessionCredentials struct {
	AccessKey string
	SecretKey string
	Channel   string
}

SessionCredentials access config for client

func (*SessionCredentials) String

func (session *SessionCredentials) String() string

type TransactionLocalListener added in v1.2.4

type TransactionLocalListener interface {
	Execute(m *Message, arg interface{}) TransactionStatus
	Check(m *MessageExt, arg interface{}) TransactionStatus
}

TransactionExecutor local executor for transaction message

type TransactionProducer added in v1.2.4

type TransactionProducer interface {

	// send a transaction message with sync
	SendMessageTransaction(msg *Message, arg interface{}) (*SendResult, error)
	// contains filtered or unexported methods
}

func NewTransactionProducer added in v1.2.4

func NewTransactionProducer(config *ProducerConfig, listener TransactionLocalListener, arg interface{}) (TransactionProducer, error)

NewTransactionProducer create a new trasaction producer with config

type TransactionStatus added in v1.2.4

type TransactionStatus int

func (TransactionStatus) String added in v1.2.4

func (status TransactionStatus) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL