Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type KafkaSource ¶
type KafkaSource struct {
source.SourceBase
}
func NewKafkaSource ¶
func NewKafkaSource(kconf *KafkaSourceConf) *KafkaSource
func (*KafkaSource) Cleanup ¶
func (k *KafkaSource) Cleanup(session sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*KafkaSource) ConsumeClaim ¶
func (k *KafkaSource) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*KafkaSource) Setup ¶
func (k *KafkaSource) Setup(session sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
type KafkaSourceConf ¶
type KafkaSourceConf struct { source.SourceConfCommon `json:",inline" yaml:",inline"` FetchDefaultBytes string `json:"fetchDefaultBytes" yaml:"fetchDefaultBytes"` ChannelBufferSize string `json:"channelBufferSize" yaml:"channelBufferSize"` BalanceStrategy string `json:"balanceStrategy" yaml:"balanceStrategy"` RebalanceTimeout int `json:"rebalanceTimeout" yaml:"balanceStrategy"` Brokers []string `json:"brokers" yaml:"brokers"` Topics []string `json:"topics" yaml:"topics"` ConsumerGroup string `json:"consumerGroup" yaml:"consumerGroup"` Offset string `json:"offset" yaml:"offset"` }
KafkaSourceConf holds the kafka source configuration
Click to show internal directories.
Click to hide internal directories.