body

package
v0.0.0-...-130f5e9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2018 License: Apache-2.0 Imports: 15 Imported by: 10

Documentation

Index

Constants

View Source
const (
	PROP_NAMESERVER_ADDR          = "PROP_NAMESERVER_ADDR"
	PROP_THREADPOOL_CORE_SIZE     = "PROP_THREADPOOL_CORE_SIZE"
	PROP_CONSUME_ORDERLY          = "PROP_CONSUMEORDERLY"
	PROP_CONSUME_TYPE             = "PROP_CONSUME_TYPE"
	PROP_CLIENT_VERSION           = "PROP_CLIENT_VERSION"
	PROP_CONSUMER_START_TIMESTAMP = "PROP_CONSUMER_START_TIMESTAMP"
)

Variables

This section is empty.

Functions

func AnalyzeProcessQueue

func AnalyzeProcessQueue(clientId string, info *ConsumerRunningInfo) bool

func AnalyzeRebalance

func AnalyzeRebalance(criTable map[string]*ConsumerRunningInfo) bool

analyzeRebalance 参数格式: TreeMap<String/* clientId */, ConsumerRunningInfo> criTable

func AnalyzeSubscription

func AnalyzeSubscription(criTable map[string]*ConsumerRunningInfo) bool

AnalyzeSubscription 分析订阅关系是否相同 参数格式: TreeMap<String/* clientId */, ConsumerRunningInfo> criTable

func FormatString

func FormatString() string

FormatString 格式化

Types

type BrokerStatsData

type BrokerStatsData struct {
	StatsMinute *BrokerStatsItem `json:"statsMinute"`
	StatsHour   *BrokerStatsItem `json:"statsHour"`
	StatsDay    *BrokerStatsItem `json:"statsDay"`
	*protocol.RemotingSerializable
}

BrokerStatsData Broker统计数据 Author rongzhihong Since 2017/9/19

func NewBrokerStatsData

func NewBrokerStatsData() *BrokerStatsData

BrokerStatsData Broker统计数据 Author rongzhihong Since 2017/9/19

type BrokerStatsItem

type BrokerStatsItem struct {
	Sum   int64   `json:"sum"`
	Tps   float64 `json:"tps"`
	Avgpt float64 `json:"avgpt"`
}

BrokerStatsItem Broker统计最小数据单元 Author rongzhihong Since 2017/9/19

type CMResult

type CMResult int
const (
	CR_SUCCESS CMResult = iota
	CR_LATER
	CR_ROLLBACK
	CR_COMMIT
	CR_THROW_EXCEPTION
	CR_RETURN_NULL
)

func (CMResult) ToString

func (consumeResult CMResult) ToString() string

type ClusterBrokerWapper

type ClusterBrokerWapper struct {
	ClusterName string `json:"clusterName"`
	BrokerName  string `json:"brokerName"`
	BrokerAddr  string `json:"brokerAddr"`
	BrokerId    int    `json:"brokerId"`
}

ClusterBrokerInfo cluster与broker包装器 Author: tianyuliang Since: 2017/11/15

func NewClusterBrokerWapper

func NewClusterBrokerWapper(clusterName, brokerName, brokerAddr string, brokerId int) *ClusterBrokerWapper

NewClusterBrokerWapper 初始化 Author: tianyuliang Since: 2017/11/15

func (*ClusterBrokerWapper) ToString

func (wapper *ClusterBrokerWapper) ToString() string

ToString 格式化ClusterBrokerWapper数据 Author: tianyuliang Since: 2017/11/15

type ClusterInfo

type ClusterInfo struct {
	BrokerAddrTable  map[string]*route.BrokerData `json:"brokerAddrTable"`  // brokerName[BrokerData]
	ClusterAddrTable map[string]set.Set           `json:"clusterAddrTable"` // clusterName[set<brokerName>]
	*protocol.RemotingSerializable
}

ClusterInfo 协议中传输对象,内容为集群信息 Author: tianyuliang Since: 2017/9/4

func NewClusterInfo

func NewClusterInfo() *ClusterInfo

NewClusterInfo 初始化 Author: tianyuliang Since: 2017/9/4

func (*ClusterInfo) RetrieveAllAddrByCluster

func (self *ClusterInfo) RetrieveAllAddrByCluster(clusterName string) []string

RetrieveAllAddrByCluster 处理所有brokerAddr地址 Author: tianyuliang Since: 2017/9/4

func (*ClusterInfo) RetrieveAllClusterNames

func (self *ClusterInfo) RetrieveAllClusterNames() []string

RetrieveAllClusterNames 处理所有brokerName名称 Author: tianyuliang Since: 2017/9/4

type ClusterPlusInfo

type ClusterPlusInfo struct {
	BrokerAddrTable  map[string]*route.BrokerData `json:"brokerAddrTable"`  // brokerName[BrokerData]
	ClusterAddrTable map[string][]string          `json:"clusterAddrTable"` // clusterName[set<brokerName>]
	*protocol.RemotingSerializable
}

ClusterPlusInfo 协议中传输对象,内容为集群信息

注意: set.Set类型在反序列化过程无法解析,因此额外设置ClusterPlusInfo类型来解析

Author: tianyuliang Since: 2017/9/4

func NewClusterPlusInfo

func NewClusterPlusInfo() *ClusterPlusInfo

NewClusterPlusInfo 初始化 Author: tianyuliang Since: 2017/9/4

func (*ClusterPlusInfo) ResolveClusterBrokerWapper

func (self *ClusterPlusInfo) ResolveClusterBrokerWapper() ([]string, []*ClusterBrokerWapper)

RetrieveAllAddrByCluster 处理所有brokerAddr地址 Author: tianyuliang Since: 2017/9/4

func (*ClusterPlusInfo) RetrieveAllAddrByCluster

func (self *ClusterPlusInfo) RetrieveAllAddrByCluster(clusterName string) ([]string, []*ClusterBrokerWapper)

RetrieveAllAddrByCluster 处理所有brokerAddr地址 Author: tianyuliang Since: 2017/9/4

func (*ClusterPlusInfo) RetrieveAllClusterNames

func (self *ClusterPlusInfo) RetrieveAllClusterNames() []string

RetrieveAllClusterNames 处理所有brokerName名称 Author: tianyuliang Since: 2017/9/4

func (*ClusterPlusInfo) ToClusterInfo

func (plus *ClusterPlusInfo) ToClusterInfo() *ClusterInfo

ToClusterInfo 转化为 ClusterInfo 类型 Author: tianyuliang Since: 2017/11/8

func (*ClusterPlusInfo) ToString

func (plus *ClusterPlusInfo) ToString() string

ToString 格式化 Author: tianyuliang Since: 2017/11/15

type Connection

type Connection struct {
	ClientId   string `json:"clientId"`   // 客户端实例
	ClientAddr string `json:"clientAddr"` // 客户端地址
	Language   string `json:"language"`   // 开发语言
	Version    int32  `json:"version"`    // mq发行版本号
}

Connection 连接信息 Author rongzhihong Since 2017/9/19

func NewConnection

func NewConnection(clientId, clientAddr, language string, version int32) *Connection

NewConnection 初始化Connection Author: tianyuliang Since: 2017/11/16

func (*Connection) String

func (self *Connection) String() string

String 格式化Connection结构体的数据 Author: tianyuliang Since: 2017/11/16

type ConsumeMessageDirectlyResult

type ConsumeMessageDirectlyResult struct {
	Order          bool
	AutoCommit     bool
	ConsumeResult  CMResult
	Remark         string
	SpentTimeMills int64
	*protocol.RemotingSerializable
}

func (*ConsumeMessageDirectlyResult) ToString

func (self *ConsumeMessageDirectlyResult) ToString() string

type ConsumeStatus

type ConsumeStatus struct {
	// contains filtered or unexported fields
}

ConsumeStatus 消费过程的统计数据 Author: tianyuliang Since: 2017/11/1

type ConsumerConnection

type ConsumerConnection struct {
	ConnectionSet     set.Set                    `json:"connectionSet"`     // type: Connection
	SubscriptionTable *sync.Map                  `json:"subscriptionTable"` // topic<*SubscriptionDataPlus>
	ConsumeType       heartbeat.ConsumeType      `json:"consumeType"`
	MessageModel      heartbeat.MessageModel     `json:"messageModel"`
	ConsumeFromWhere  heartbeat.ConsumeFromWhere `json:"consumeFromWhere"`
	*protocol.RemotingSerializable
}

ConsumerConnection 消费者连接信息 Author rongzhihong Since 2017/9/19

func NewConsumerConnection

func NewConsumerConnection() *ConsumerConnection

NewConsumerConnection 初始化 Author rongzhihong Since 2017/9/19

func (*ConsumerConnection) ComputeMinVersion

func (consumerConn *ConsumerConnection) ComputeMinVersion() int32

ComputeMinVersion 计算最小版本号 Author rongzhihong Since 2017/9/19

type ConsumerConnectionPlus

type ConsumerConnectionPlus struct {
	ConnectionSet     []*Connection                              `json:"connectionSet"`     // type: Connection
	SubscriptionTable map[string]*heartbeat.SubscriptionDataPlus `json:"subscriptionTable"` // topic<*SubscriptionDataPlus>
	ConsumeType       heartbeat.ConsumeType                      `json:"consumeType"`
	MessageModel      heartbeat.MessageModel                     `json:"messageModel"`
	ConsumeFromWhere  heartbeat.ConsumeFromWhere                 `json:"consumeFromWhere"`
	*protocol.RemotingSerializable
}

ConsumerConnectionPlus 消费者连接信息(处理set集合无法反序列化问题) Author rongzhihong Since 2017/9/19

func NewConsumerConnectionPlus

func NewConsumerConnectionPlus() *ConsumerConnectionPlus

NewConsumerConnection 初始化 Author rongzhihong Since 2017/9/19

func (*ConsumerConnectionPlus) ToConsumerConnection

func (plus *ConsumerConnectionPlus) ToConsumerConnection() *ConsumerConnection

ToConsumerConnection 转化为ConsumerConnection Author: tianyuliang Since: 2017/11/13

type ConsumerOffsetSerializeWrapper

type ConsumerOffsetSerializeWrapper struct {
	OffsetTable *sync.Map `json:"offsetTable"` // key topic@group value:map[int]int64
	*protocol.RemotingSerializable
}

ConsumerOffsetSerializeWrapper Consumer消费进度,序列化包装 Author gaoyanlei Since 2017/8/22

func NewConsumerOffsetSerializeWrapper

func NewConsumerOffsetSerializeWrapper() *ConsumerOffsetSerializeWrapper

NewConsumerOffsetSerializeWrapper 初始化 Author gaoyanlei Since 2017/8/22

type ConsumerRunningInfo

type ConsumerRunningInfo struct {
	Properties      map[string]interface{}                      // 各种配置及运行数据
	SubscriptionSet set.Set                                     // TreeSet<SubscriptionData>, 订阅关系
	MqTable         map[*message.MessageQueue]*ProcessQueueInfo // TreeMap[Topic]ConsumeStatus, 消费进度、Rebalance、内部消费队列的信息
	StatusTable     map[string]*ConsumeStatus                   // TreeMap[Topic]ConsumeStatus, RT、TPS统计
	JstackEnable    string                                      // jstack的结果
	*protocol.RemotingSerializable
}

ConsumerRunningInfo Consumer内部数据结构 Author: tianyuliang Since: 2017/11/1

func NewConsumerRunningInfo

func NewConsumerRunningInfo() *ConsumerRunningInfo

type GetConsumerStatusBody

type GetConsumerStatusBody struct {
	MessageQueueTable map[*message.MessageQueue]int64            `json:"messageQueueTable"`
	ConsumerTable     map[string]map[*message.MessageQueue]int64 `json:"consumerTable"`
	*protocol.RemotingSerializable
}

GetConsumerStatusBody 获得消费状态的body Author rongzhihong Since 2017/9/19

func NewGetConsumerStatusBody

func NewGetConsumerStatusBody() *GetConsumerStatusBody

type GroupList

type GroupList struct {
	GroupList set.Set `json:"groupList"`
	*protocol.RemotingSerializable
}

GroupList 分组集合 Author rongzhihong Since 2017/9/19

func NewGroupList

func NewGroupList() *GroupList

NewGroupList 初始化 Author: tianyuliang Since: 2017/11/1

func (*GroupList) ToString

func (self *GroupList) ToString() string

ToString 打印结构信息 Author: tianyuliang Since: 2017/11/1

type KVTable

type KVTable struct {
	Table map[string]string `json:"table"`
	*protocol.RemotingSerializable
}

func NewKVTable

func NewKVTable() *KVTable

type LockBatchRequestBody

type LockBatchRequestBody struct {
	ConsumerGroup string  `json:"consumerGroup"`
	ClientId      string  `json:"clientId"`
	MqSet         set.Set `json:"mq_set"`
}

LockBatchRequestBody 锁队列请求头 Author rongzhihong Since 2017/9/19

func NewLockBatchRequestBody

func NewLockBatchRequestBody() *LockBatchRequestBody

type LockBatchResponseBody

type LockBatchResponseBody struct {
	LockOKMQSet set.Set `json:"lockOKMQSet"`
}

LockBatchRequestBody 锁队列响应头 Author rongzhihong Since 2017/9/19

func NewLockBatchResponseBody

func NewLockBatchResponseBody() *LockBatchResponseBody

type LockEntry

type LockEntry struct {
	ClientId            string
	LastUpdateTimestamp int64
}

LockEntry LockEntry Author: rongzhihong Since: 2017/9/19

func NewLockEntry

func NewLockEntry() *LockEntry

func (*LockEntry) IsExpired

func (entry *LockEntry) IsExpired() bool

func (*LockEntry) IsLocked

func (entry *LockEntry) IsLocked(clientId string) bool

type ProcessQueueInfo

type ProcessQueueInfo struct {
	CommitOffset int64 // 消费到哪里,提交的offset

	//  缓存的消息Offset信息
	CachedMsgMinOffset int64
	CachedMsgMaxOffset int64
	CachedMsgCount     int

	// 正在事务中的消息
	TransactionMsgMinOffset int64
	TransactionMsgMaxOffset int64
	TransactionMsgCount     int

	// 顺序消息的状态信息
	Locked            bool
	TryUnlockTimes    int64
	LastLockTimestamp int64

	// 最新消费的状态信息
	Droped               bool
	LastPullTimestamp    int64
	LastConsumeTimestamp int64
}

ProcessQueueInfo 内部消费队列的信息 Author: tianyuliang Since: 2017/11/1

func (*ProcessQueueInfo) ToString

func (p *ProcessQueueInfo) ToString() string

ToString 显示内部消费队列的信息 Author: tianyuliang Since: 2017/11/1

type ProducerConnection

type ProducerConnection struct {
	ConnectionSet set.Set `json:"connectionSet"`
	*protocol.RemotingSerializable
}

ProducerConnection 生产者连接 Author rongzhihong Since 2017/9/19

func NewProducerConnection

func NewProducerConnection() *ProducerConnection

NewProducerConnection 初始化 Author rongzhihong Since 2017/9/19

type QueryConsumeTimeSpanBody

type QueryConsumeTimeSpanBody struct {
	ConsumeTimeSpanSet set.Set `json:"consumeTimeSpanSet"`
}

QueryConsumeTimeSpanBody 查询消费时间跨度 Author rongzhihong Since 2017/9/19

func NewQueryConsumeTimeSpanBody

func NewQueryConsumeTimeSpanBody() *QueryConsumeTimeSpanBody

type QueryCorrectionOffsetBody

type QueryCorrectionOffsetBody struct {
	CorrectionOffsets map[int]int64 `json:"correctionOffsets"`
}

QueryCorrectionOffsetBody 查找被修正 offset (转发组件)的返回内容 Author rongzhihong Since 2017/9/19

func NewQueryCorrectionOffsetBody

func NewQueryCorrectionOffsetBody() *QueryCorrectionOffsetBody

type QueueTimeSpan

type QueueTimeSpan struct {
	MessageQueue     *message.MessageQueue `json:"messageQueue"`
	MinTimeStamp     int64                 `json:"minTimeStamp"`
	MaxTimeStamp     int64                 `json:"maxTimeStamp"`
	ConsumeTimeStamp int64                 `json:"consumeTimeStamp"`
}

QueueTimeSpan 查询时间宽度 Author rongzhihong Since 2017/9/19

func (*QueueTimeSpan) GetConsumeTimeStampStr

func (timespan *QueueTimeSpan) GetConsumeTimeStampStr() string

GetConsumeTimeStampStr 消费时间 Author rongzhihong Since 2017/9/19

func (*QueueTimeSpan) GetMaxTimeStampStr

func (timespan *QueueTimeSpan) GetMaxTimeStampStr() string

GetMaxTimeStampStr 终止时间 Author rongzhihong Since 2017/9/19

func (*QueueTimeSpan) GetMinTimeStampStr

func (timespan *QueueTimeSpan) GetMinTimeStampStr() string

GetMinTimeStampStr 起始时间 Author rongzhihong Since 2017/9/19

type RegisterBrokerBody

type RegisterBrokerBody struct {
	TopicConfigSerializeWrapper *TopicConfigSerializeWrapper `json:"topicConfigSerializeWrapper"`
	FilterServerList            []string                     `json:"filterServerList"`
	*protocol.RemotingSerializable
}

RegisterBrokerBody 注册Broker-请求/响应体 Author gaoyanlei Since 2017/8/22

func NewRegisterBrokerBody

func NewRegisterBrokerBody(topicConfigWrapper *TopicConfigSerializeWrapper, filterServerList []string) *RegisterBrokerBody

type ResetOffsetBody

type ResetOffsetBody struct {
	OffsetTable map[*message.MessageQueue]int64 `json:"offsetTable"`
}

ResetOffsetBody 重置偏移量的body Author rongzhihong Since 2017/9/18

func NewResetOffsetBody

func NewResetOffsetBody() *ResetOffsetBody

type SubscriptionGroupWrapper

type SubscriptionGroupWrapper struct {
	SubscriptionGroupTable *sync.Map             `json:"subscriptionGroupTable"`
	DataVersion            stgcommon.DataVersion `json:"dataVersion"`
	*protocol.RemotingSerializable
}

SubscriptionGroupWrapper 订阅组配置,序列化包装 Author gaoyanlei Since 2017/8/22

func NewSubscriptionGroupWrapper

func NewSubscriptionGroupWrapper() *SubscriptionGroupWrapper

type TopicBrokerClusterWapper

type TopicBrokerClusterWapper struct {
	ClusterName             string                   `json:"clusterName"`
	TopicName               string                   `json:"topic"`
	TopicUpdateConfigWapper *TopicUpdateConfigWapper `json:"topicConfig"`
}

func NewTopicBrokerClusterWapper

func NewTopicBrokerClusterWapper(clusterName, topicName string, queueData *route.QueueData) *TopicBrokerClusterWapper

type TopicConfigSerializeWrapper

type TopicConfigSerializeWrapper struct {
	TopicConfigTable *TopicConfigTable      `json:"topicConfigTable"`
	DataVersion      *stgcommon.DataVersion `json:"dataVersion"`
	*protocol.RemotingSerializable
}

TopicConfigSerializeWrapper topic Author gaoyanlei Since 2017/8/11

func NewTopicConfigSerializeWrapper

func NewTopicConfigSerializeWrapper(dataVersion ...*stgcommon.DataVersion) *TopicConfigSerializeWrapper

NewTopicConfigSerializeWrapper 格式化 Author: tianyuliang Since: 2017/10/21

func (*TopicConfigSerializeWrapper) ToString

func (self *TopicConfigSerializeWrapper) ToString() string

ToString 格式化 Author: tianyuliang Since: 2017/10/21

type TopicConfigTable

type TopicConfigTable struct {
	TopicConfigs map[string]*stgcommon.TopicConfig `json:"topicConfigTable"`
	sync.RWMutex `json:"-"`
}

func NewTopicConfigTable

func NewTopicConfigTable() *TopicConfigTable

func (*TopicConfigTable) Clear

func (table *TopicConfigTable) Clear()

Clear 清空map author rongzhihong since 2017/9/18

func (*TopicConfigTable) ClearAndPutAll

func (table *TopicConfigTable) ClearAndPutAll(topicConfigTable map[string]*stgcommon.TopicConfig)

ClearAndPutAll 清空map后,再putAll author rongzhihong since 2017/9/18

func (*TopicConfigTable) Foreach

func (table *TopicConfigTable) Foreach(fn func(k string, v *stgcommon.TopicConfig))

func (*TopicConfigTable) ForeachUpdate

func (table *TopicConfigTable) ForeachUpdate(fn func(k string, v *stgcommon.TopicConfig))

func (*TopicConfigTable) Get

func (*TopicConfigTable) Keys

func (self *TopicConfigTable) Keys() []string

func (*TopicConfigTable) Put

func (*TopicConfigTable) PutAll

func (table *TopicConfigTable) PutAll(topicConfigTable map[string]*stgcommon.TopicConfig)

PutAll put all author rongzhihong since 2017/9/18

func (*TopicConfigTable) Remove

func (table *TopicConfigTable) Remove(k string) *stgcommon.TopicConfig

func (*TopicConfigTable) Size

func (table *TopicConfigTable) Size() int

func (*TopicConfigTable) ToString

func (self *TopicConfigTable) ToString() string

ToString 打印TopicConfigTable结构体的数据 Author: tianyuliang Since: 2017/10/3

type TopicList

type TopicList struct {
	TopicList  set.Set `json:"topicList"`  // topic列表
	BrokerAddr string  `json:"brokerAddr"` // broker地址
	*protocol.RemotingSerializable
}

TopicList topic列表 Author: tianyuliang Since: 2017/9/16

func NewTopicList

func NewTopicList() *TopicList

NewTopicList 初始化 Author: tianyuliang Since: 2017/9/16

type TopicPlusList

type TopicPlusList struct {
	TopicList        []string                      `json:"topicList"`        // topic列表
	BrokerAddr       string                        `json:"brokerAddr"`       // broker地址
	TopicQueueTable  map[string][]*route.QueueData `json:"topicQueueTable"`  // 额外增加字段 topic<*route.QueueData>
	ClusterAddrTable map[string][]string           `json:"clusterAddrTable"` // clusterName[set<brokerName>]
	*protocol.RemotingSerializable
}

TopicPlusList 拓展Topic列表 Author: tianyuliang Since: 2017/9/16

func NewTopicPlusList

func NewTopicPlusList() *TopicPlusList

NewTopicPlusList 初始化 Author: tianyuliang Since: 2017/9/16

type TopicUpdateConfigWapper

type TopicUpdateConfigWapper struct {
	TopicName      string `json:"topicName"`
	ClusterName    string `json:"clusterName"`
	Order          bool   `json:"order"`
	WriteQueueNums int    `json:"writeQueueNums"`
	ReadQueueNums  int    `json:"readQueueNums"`
	BrokerAddr     string `json:"brokerAddr"`
	BrokerId       int    `json:"brokerId"`
	BrokerName     string `json:"brokerName"`
	Unit           bool   `json:"unit"`
	Perm           int    `json:"perm"`
	TopicSynFlag   int    `json:"topicSynFlag"`
}

func NewTopicUpdateConfigWapper

func NewTopicUpdateConfigWapper(clusterName, topicName string, queueData *route.QueueData) *TopicUpdateConfigWapper

type UnlockBatchRequestBody

type UnlockBatchRequestBody struct {
	ConsumerGroup string  `json:"consumerGroup"`
	ClientId      string  `json:"clientId"`
	MqSet         set.Set `json:"mqSet"`
}

UnlockBatchRequestBody 解锁队列响应头 Author rongzhihong Since 2017/9/19

func NewUnlockBatchRequestBody

func NewUnlockBatchRequestBody() *UnlockBatchRequestBody

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL