Versions in this module Expand all Collapse all v0 v0.7.0 Jul 19, 2023 Changes in this version + func NewBundleMetadata(partition int32, offset kafka.Offset) *bundleMetadata + func NewCommitter(committerInterval time.Duration, topic string, client *kafka.Consumer, ...) *committer + type Consumer interface + BundleRegister func(registration *registration.BundleRegistration) + CustomBundleRegister func(msgID string, customBundleRegistration *registration.CustomBundleRegistration) + GetGenericBundleChan func() chan *bundle.GenericBundle + Start func(ctx context.Context) error + type GenericConsumer struct + func NewGenericConsumer(transportConfig *transport.TransportConfig) (*GenericConsumer, error) + func (c *GenericConsumer) MessageChan() chan *transport.Message + func (c *GenericConsumer) Start(ctx context.Context) error + type KafkaConsumer struct + func NewKafkaConsumer(kafkaConfig *transport.KafkaConfig, log logr.Logger) (*KafkaConsumer, error) + func (c *KafkaConsumer) BundleRegister(registration *registration.BundleRegistration) + func (c *KafkaConsumer) Commit(msg *kafka.Message) error + func (c *KafkaConsumer) Consumer() *kafka.Consumer + func (c *KafkaConsumer) CustomBundleRegister(msgID string, customBundleRegistration *registration.CustomBundleRegistration) + func (c *KafkaConsumer) GetGenericBundleChan() chan *bundle.GenericBundle + func (c *KafkaConsumer) GetMessageChan() chan *kafka.Message + func (c *KafkaConsumer) SetCommitter(committer *committer) + func (c *KafkaConsumer) SetConflationManager(conflationMgr *conflator.ConflationManager) + func (c *KafkaConsumer) SetLeafHubName(leafHubName string) + func (c *KafkaConsumer) SetStatistics(statistics *statistics.Statistics) + func (c *KafkaConsumer) Start(ctx context.Context) error + func (c *KafkaConsumer) Subscribe(topic string) error + func (c *KafkaConsumer) SyncCustomBundle(customBundleRegistration *registration.CustomBundleRegistration, ...) error + type KafkaConsumerConfig struct + ConsumerID string + ConsumerTopic string + type SaramaConsumer interface + MarkOffset func(topic string, partition int32, offset int64) + MessageChan func() chan *sarama.ConsumerMessage + Start func(ctx context.Context) error + func NewSaramaConsumer(ctx context.Context, kafkaConfig *transport.KafkaConfig) (SaramaConsumer, error)