Documentation ¶
Index ¶
- Constants
- func ConsumerMessageToMap(m *sarama.ConsumerMessage) map[string]interface{}
- type ClientConfig
- type Config
- type ConsumerGroup
- func (s *ConsumerGroup) Cleanup(session sarama.ConsumerGroupSession) error
- func (s *ConsumerGroup) Close() error
- func (s *ConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (s *ConsumerGroup) SetHandler(h Handler)
- func (s *ConsumerGroup) Setup(session sarama.ConsumerGroupSession) error
- func (s *ConsumerGroup) Start()
- type ConsumerGroupConfig
- type Handler
- type Message
- type Producer
- type ProducerConfig
- type RetryConfig
Constants ¶
View Source
const ( PackageNameProducer = "fkafka.producer" PackageNameConsumerGroup = "fkafka.consumerGroup" )
View Source
const ( CodeOK = "OK" CodeError = "Error" )
Variables ¶
This section is empty.
Functions ¶
func ConsumerMessageToMap ¶
func ConsumerMessageToMap(m *sarama.ConsumerMessage) map[string]interface{}
Types ¶
type ClientConfig ¶
type Config ¶
type Config struct { Debug bool EnableAccessInterceptorReq bool // 是否开启记录 publish 消息,默认开启 EnableAccessInterceptorRes bool // 是否开启记录 consumer 消费消息, 默认开启 ClientConfig ClientConfig ProducerConfig ProducerConfig ConsumerGroupConfigs map[string]ConsumerGroupConfig }
func DefaultConfig ¶
func DefaultConfig() *Config
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
func NewConsumerGroup ¶
func NewConsumerGroup(name string, config *Config, groupConfig *ConsumerGroupConfig) (*ConsumerGroup, error)
func (*ConsumerGroup) Cleanup ¶
func (s *ConsumerGroup) Cleanup(session sarama.ConsumerGroupSession) error
func (*ConsumerGroup) Close ¶
func (s *ConsumerGroup) Close() error
func (*ConsumerGroup) ConsumeClaim ¶
func (s *ConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (*ConsumerGroup) SetHandler ¶
func (s *ConsumerGroup) SetHandler(h Handler)
func (*ConsumerGroup) Setup ¶
func (s *ConsumerGroup) Setup(session sarama.ConsumerGroupSession) error
func (*ConsumerGroup) Start ¶
func (s *ConsumerGroup) Start()
type ConsumerGroupConfig ¶
type ConsumerGroupConfig struct { Topics []string GroupID string InitialOffset string // 初始化 offset, oldest / newest, 默认 oldest RetryConfig RetryConfig }
type Message ¶
type Message struct { Topic string Key []byte Value []byte // The headers are key-value pairs that are transparently passed // by Kafka between producers and consumers. Headers []sarama.RecordHeader // This field is used to hold arbitrary data you wish to include, so it // will be available when receiving on the Successes and Errors channels. // Sarama completely ignores this field and is only to be used for // pass-through data. Metadata interface{} }
Message sarama.ProducerMessage for kafka publish
type ProducerConfig ¶
type ProducerConfig struct {
MaxMessageBytes int
}
type RetryConfig ¶
type RetryConfig struct {
MaxRetries int64 // consumer 消费重试次数, 默认 0 不重试
}
Click to show internal directories.
Click to hide internal directories.