Documentation
¶
Index ¶
- Constants
- Variables
- func TrimSuffix(s, suffix string) string
- type AllocateMessageQueueStrategy
- type ConsumeConcurrentlyStatus
- type Consumer
- type ConsumerConfig
- type DefaultConsumer
- func (c *DefaultConsumer) Callback(f func())
- func (c *DefaultConsumer) RegisterMessageListener(topic string, listener func(message Message) ConsumeConcurrentlyStatus)
- func (c *DefaultConsumer) Shutdown()
- func (c *DefaultConsumer) ShutdownCallback(callback func())
- func (c *DefaultConsumer) Start() error
- func (c *DefaultConsumer) Subscribe(topic string) bool
- func (c *DefaultConsumer) Unsubscribe(topics ...string)
- type DefaultProducer
- type FactoryConfig
- type Message
- type MessageQueue
- type MessageQueueSelector
- type Monitor
- type MonitorListener
- type Mq
- type MqConfig
- type MqFactory
- func (m *MqFactory) GetConsumer() Consumer
- func (m *MqFactory) GetConsumerByConfig(config *ConsumerConfig) Consumer
- func (m *MqFactory) GetConsumerByGroup(groupName string) Consumer
- func (m *MqFactory) GetProducer() Producer
- func (m *MqFactory) GetProducerByConfig(config *ProducerConfig) Producer
- func (m *MqFactory) GetProducerByGroup(groupName string) Producer
- type Msg
- type MsgQueueSelector
- type Pool
- type Producer
- type ProducerConfig
- type SelectMessageQueueByPolling
- type SelectMessageQueueByRandom
- type ServiceState
- type ToPicConfig
- type TopicPublishInfo
Constants ¶
View Source
const ( DefaultProducerGroup = "DEFAULT_PRODUCER" DefaultConsumerGroup = "DEFAULT_CONSUMER" )
View Source
const ( DefaultMessageQueueLength = 5 DefaultMessageCapLength = 1 DefaultPoolSize = 5 )
Variables ¶
View Source
var ( TopicEmpty = errors.New("please specify a topic to send messages ") ServiceStateNoStart = errors.New("producer service start createJust for not send messages") )
View Source
var ( // ErrShutdownAlready the producer service state not OK, maybe started once ErrShutdownAlready = errors.New("the producer service state not OK, maybe started once") // ConsumerConfigEmpty Consumer configuration required for MQ is empty ConsumerConfigEmpty = errors.New("consumer configuration required for MQ is empty") //ConsumerConfigMessageListenersEmpty consumer configuration MessageListeners required for MQ is empty ConsumerConfigMessageListenersEmpty = errors.New("consumer configuration MessageListeners required for MQ is empty") )
Functions ¶
func TrimSuffix ¶
Types ¶
type AllocateMessageQueueStrategy ¶
type AllocateMessageQueueStrategy interface {
// Name The strategy name
Name() string
// Allocate To allocate result of given strategy
Allocate() []MessageQueue
}
AllocateMessageQueueStrategy Strategy Algorithm for message allocating between consumers
type ConsumeConcurrentlyStatus ¶
type ConsumeConcurrentlyStatus int
const ( // ConsumeSuccess Success consumption ConsumeSuccess ConsumeConcurrentlyStatus = iota // ReconsumeLater Failure consumption,later try to consume ReconsumeLater )
type Consumer ¶
type Consumer interface {
// Start the consumer with the given parameters and return immediately
Start() error
// Shutdown Stop the consumer with the given parameters and return immediately
Shutdown()
ShutdownCallback(callback func())
// Subscribe with the given parameters and return immediately
Subscribe(topic string) bool
// Unsubscribe with the given parameters and return immediately
Unsubscribe(topics ...string)
// RegisterMessageListener with the given parameters and return immediately and
RegisterMessageListener(topic string, l func(message Message) ConsumeConcurrentlyStatus)
}
type ConsumerConfig ¶
type ConsumerConfig struct {
PoolSize int
ConsumerGroup string
MessageListeners map[string]func(message Message) ConsumeConcurrentlyStatus
}
type DefaultConsumer ¶
type DefaultConsumer struct {
ConsumerGroup string
ServiceState ServiceState
Listeners map[string]func(message Message) ConsumeConcurrentlyStatus
// contains filtered or unexported fields
}
func (*DefaultConsumer) Callback ¶
func (c *DefaultConsumer) Callback(f func())
func (*DefaultConsumer) RegisterMessageListener ¶
func (c *DefaultConsumer) RegisterMessageListener(topic string, listener func(message Message) ConsumeConcurrentlyStatus)
func (*DefaultConsumer) Shutdown ¶
func (c *DefaultConsumer) Shutdown()
func (*DefaultConsumer) ShutdownCallback ¶
func (c *DefaultConsumer) ShutdownCallback(callback func())
func (*DefaultConsumer) Start ¶
func (c *DefaultConsumer) Start() error
func (*DefaultConsumer) Subscribe ¶
func (c *DefaultConsumer) Subscribe(topic string) bool
func (*DefaultConsumer) Unsubscribe ¶
func (c *DefaultConsumer) Unsubscribe(topics ...string)
type DefaultProducer ¶
type DefaultProducer struct {
ProducerGroup string
ServiceState ServiceState
MqFactory *MqFactory
}
func (*DefaultProducer) Send ¶
func (p *DefaultProducer) Send(msg Message) error
func (*DefaultProducer) Shutdown ¶
func (p *DefaultProducer) Shutdown()
func (*DefaultProducer) Start ¶
func (p *DefaultProducer) Start() error
type FactoryConfig ¶
type FactoryConfig struct {
ToPicConfigs []*ToPicConfig
//Topic 队列选择器 选填默认随机 优先ToPicConfigs 中
Selector MsgQueueSelector
}
type MessageQueue ¶
type MessageQueueSelector ¶
type MessageQueueSelector interface {
Select(messageQueues []*MessageQueue) *MessageQueue
}
func MessageQueueSelectorCreate ¶
func MessageQueueSelectorCreate(selector MsgQueueSelector) MessageQueueSelector
type Monitor ¶
type Monitor struct {
// contains filtered or unexported fields
}
func NewMonitor ¶ added in v0.3.0
func (*Monitor) CloseMonitor ¶
func (m *Monitor) CloseMonitor()
func (*Monitor) Surround ¶ added in v0.3.0
func (m *Monitor) Surround(listener func(message Message) ConsumeConcurrentlyStatus, msg Message)
func (*Monitor) TurnMonitor ¶
func (m *Monitor) TurnMonitor()
type MonitorListener ¶
type MonitorListener interface {
Surround(messageListener func(message Message) ConsumeConcurrentlyStatus, msg Message)
Print()
Info() map[string]*Msg
TurnMonitor()
CloseMonitor()
}
type Mq ¶
type Mq struct {
Consumer Consumer
Producer Producer
MonitorListener MonitorListener
}
type MqConfig ¶
type MqConfig struct {
//Topic 队列选择器 //默认随机 ToPicConfig 不配置 默认去mq
Selector MsgQueueSelector
ProducerConfig *ProducerConfig
ConsumerConfig *ConsumerConfig
ToPicConfigs []*ToPicConfig
DefaultTopicPoolSize int
}
type MqFactory ¶
type MqFactory struct {
TopicPublishInfoTable map[string]*TopicPublishInfo
// contains filtered or unexported fields
}
func Instance ¶
func Instance(config *FactoryConfig) *MqFactory
func (*MqFactory) GetConsumer ¶
GetConsumer 获取一个topic 对应的消费者 采用延迟初始化
func (*MqFactory) GetConsumerByConfig ¶
func (m *MqFactory) GetConsumerByConfig(config *ConsumerConfig) Consumer
func (*MqFactory) GetConsumerByGroup ¶
GetConsumer 获取一个topic 对应的消费者 采用延迟初始化
func (*MqFactory) GetProducer ¶
GetProducer 获取一个topic 对应的生产者 采用延迟初始化
func (*MqFactory) GetProducerByConfig ¶
func (m *MqFactory) GetProducerByConfig(config *ProducerConfig) Producer
func (*MqFactory) GetProducerByGroup ¶
GetProducer 获取一个topic 对应的生产者 采用延迟初始化
type Msg ¶
type Msg struct {
// topicName name for monitoring
TopicName string
// consumerGroup name for monitoring
ConsumerGroup string
// taskCount task count for monitoring
TaskCount int64
// timeCount time count for monitoring
TimeCount int64
// maxTime max time task for monitoring
MaxTime int64
// minTime min time task for monitoring
MinTime int64
Running int64
Waiting int64
// started
StartTime time.Time
//
LastTaskTime int64
// contains filtered or unexported fields
}
type MsgQueueSelector ¶
type MsgQueueSelector int
const ( // Random 随机 Random MsgQueueSelector = iota Polling )
type ProducerConfig ¶
type ProducerConfig struct {
ProducerGroupName string
}
type SelectMessageQueueByPolling ¶
type SelectMessageQueueByPolling struct {
// contains filtered or unexported fields
}
func (*SelectMessageQueueByPolling) Select ¶
func (s *SelectMessageQueueByPolling) Select(messageQueues []*MessageQueue) *MessageQueue
Select 通过轮询选择消息队列
type SelectMessageQueueByRandom ¶
type SelectMessageQueueByRandom struct{}
func (*SelectMessageQueueByRandom) Select ¶
func (s *SelectMessageQueueByRandom) Select(messageQueues []*MessageQueue) *MessageQueue
Select 随机选择消息队列
type ServiceState ¶
type ServiceState int
const ( // CreateJust Service just created,not start CreateJust ServiceState = iota // Running Service Running Running // ShutdownAlready Service shutdown ShutdownAlready // StartFailed Service Start failure StartFailed )
type ToPicConfig ¶
type ToPicConfig struct {
//名称
TopicName string
//队列长度
MessageQueueLength int
//队列缓存容量
MessageCapLength int
//Topic 队列选择器 //默认随机
Selector MsgQueueSelector
// 可用线程数
PoolSize int
// contains filtered or unexported fields
}
ToPicConfig Topic 配置
func NewToPicConfig ¶
func NewToPicConfig(topicName string) *ToPicConfig
func NewToPicConfigByPoolSize ¶ added in v0.3.0
func NewToPicConfigByPoolSize(topicName string, DefaultTopicPoolSize int) *ToPicConfig
type TopicPublishInfo ¶
type TopicPublishInfo struct {
ToPicConfig *ToPicConfig
MessageQueueSelector MessageQueueSelector
// contains filtered or unexported fields
}
func (*TopicPublishInfo) TopicBlockageMessageQueueCount ¶
func (topic *TopicPublishInfo) TopicBlockageMessageQueueCount() int64
Source Files
¶
Click to show internal directories.
Click to hide internal directories.