Documentation ¶
Index ¶
- func EnableKafkaConsumerTestUtil() fx.Option
- func HandleMessage(consumerName string, message []byte)
- func MessageCollectorOpt() fx.Option
- func NewMessageCollectorHandler(collector *MessageCollector) core.ConsumerHandler
- func ResetKafkaConsumerGroupOpt() fx.Option
- type MessageCollector
- type MessageCollectorHandler
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func HandleMessage ¶
HandleMessage handle message
func MessageCollectorOpt ¶
func NewMessageCollectorHandler ¶
func NewMessageCollectorHandler(collector *MessageCollector) core.ConsumerHandler
Types ¶
type MessageCollector ¶
type MessageCollector struct {
// contains filtered or unexported fields
}
func NewMessageCollector ¶
func NewMessageCollector() *MessageCollector
func (*MessageCollector) ClearMessages ¶
func (k *MessageCollector) ClearMessages(topic string)
func (*MessageCollector) Count ¶
func (k *MessageCollector) Count(topic string) int64
func (*MessageCollector) GetMessages ¶
func (k *MessageCollector) GetMessages(topic string) []string
func (*MessageCollector) PushMessage ¶
func (k *MessageCollector) PushMessage(message *core.ConsumerMessage)
type MessageCollectorHandler ¶
type MessageCollectorHandler struct {
// contains filtered or unexported fields
}
func (MessageCollectorHandler) Close ¶
func (c MessageCollectorHandler) Close()
func (*MessageCollectorHandler) HandlerFunc ¶
func (c *MessageCollectorHandler) HandlerFunc(msg *core.ConsumerMessage)
Click to show internal directories.
Click to hide internal directories.