Documentation ¶
Index ¶
- type ConsumerGroup
- type ConsumerGroupCreator
- type ConsumerGroupHandler
- func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error
- func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) error
- func (h *ConsumerGroupHandler) Reserve(ctx context.Context) error
- func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error
- type KafkaConsumer
- func (k *KafkaConsumer) Description() string
- func (k *KafkaConsumer) Gather(acc cua.Accumulator) error
- func (k *KafkaConsumer) Init() error
- func (k *KafkaConsumer) SampleConfig() string
- func (k *KafkaConsumer) SetParser(parser parsers.Parser)
- func (k *KafkaConsumer) Start(acc cua.Accumulator) error
- func (k *KafkaConsumer) Stop()
- type Message
- type SaramaCreator
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConsumerGroup ¶
type ConsumerGroupCreator ¶
type ConsumerGroupHandler ¶
type ConsumerGroupHandler struct { MaxMessageLen int TopicTag string // contains filtered or unexported fields }
ConsumerGroupHandler is a sarama.ConsumerGroupHandler implementation.
func NewConsumerGroupHandler ¶
func NewConsumerGroupHandler(acc cua.Accumulator, maxUndelivered int, parser parsers.Parser) *ConsumerGroupHandler
func (*ConsumerGroupHandler) Cleanup ¶
func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error
Cleanup stops the internal goroutine and is called after all ConsumeClaim functions have completed.
func (*ConsumerGroupHandler) ConsumeClaim ¶
func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim is called once each claim in a goroutine and must be thread-safe. Should run until the claim is closed.
func (*ConsumerGroupHandler) Handle ¶
func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) error
Handle processes a message and if successful saves it to be acknowledged after delivery.
func (*ConsumerGroupHandler) Reserve ¶
func (h *ConsumerGroupHandler) Reserve(ctx context.Context) error
Reserve blocks until there is an available slot for a new message.
func (*ConsumerGroupHandler) Setup ¶
func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error
Setup is called once when a new session is opened. It setups up the handler and begins processing delivered messages.
type KafkaConsumer ¶
type KafkaConsumer struct { Brokers []string `toml:"brokers"` ConsumerGroup string `toml:"consumer_group"` MaxMessageLen int `toml:"max_message_len"` MaxUndeliveredMessages int `toml:"max_undelivered_messages"` Offset string `toml:"offset"` BalanceStrategy string `toml:"balance_strategy"` Topics []string `toml:"topics"` TopicTag string `toml:"topic_tag"` kafka.ReadConfig Log cua.Logger `toml:"-"` ConsumerCreator ConsumerGroupCreator `toml:"-"` // contains filtered or unexported fields }
func (*KafkaConsumer) Description ¶
func (k *KafkaConsumer) Description() string
func (*KafkaConsumer) Gather ¶
func (k *KafkaConsumer) Gather(acc cua.Accumulator) error
func (*KafkaConsumer) Init ¶
func (k *KafkaConsumer) Init() error
func (*KafkaConsumer) SampleConfig ¶
func (k *KafkaConsumer) SampleConfig() string
func (*KafkaConsumer) SetParser ¶
func (k *KafkaConsumer) SetParser(parser parsers.Parser)
func (*KafkaConsumer) Start ¶
func (k *KafkaConsumer) Start(acc cua.Accumulator) error
func (*KafkaConsumer) Stop ¶
func (k *KafkaConsumer) Stop()
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
Message is an aggregate type binding the Kafka message and the session so that offsets can be updated.
type SaramaCreator ¶
type SaramaCreator struct{}
func (*SaramaCreator) Create ¶
func (*SaramaCreator) Create(brokers []string, group string, config *sarama.Config) (ConsumerGroup, error)