consumer

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 19, 2023 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewBundleMetadata

func NewBundleMetadata(partition int32, offset kafka.Offset) *bundleMetadata

NewBundleMetadata returns a new instance of BundleMetadata.

func NewCommitter

func NewCommitter(committerInterval time.Duration, topic string, client *kafka.Consumer,
	getBundlesMetadataFunc bundle.GetBundlesMetadataFunc, log logr.Logger,
) *committer

NewCommitter returns a new instance of committer.

Types

type Consumer

type Consumer interface {
	// Start starts the transport.
	Start(ctx context.Context) error
	// CustomBundleRegister registers a bundle ID to a CustomBundleRegistration. None-registered bundles are assumed to be
	// of type GenericBundle, and are handled by the GenericBundleSyncer.
	CustomBundleRegister(msgID string, customBundleRegistration *registration.CustomBundleRegistration)

	// BundleRegister function registers a msgID to the bundle updates channel.
	BundleRegister(registration *registration.BundleRegistration)

	// provide the generic bundle for message producer
	GetGenericBundleChan() chan *bundle.GenericBundle
}

Transport is an interface for transport layer.

type GenericConsumer

type GenericConsumer struct {
	// contains filtered or unexported fields
}

func NewGenericConsumer

func NewGenericConsumer(transportConfig *transport.TransportConfig) (*GenericConsumer, error)

func (*GenericConsumer) MessageChan

func (c *GenericConsumer) MessageChan() chan *transport.Message

func (*GenericConsumer) Start

func (c *GenericConsumer) Start(ctx context.Context) error

type KafkaConsumer

type KafkaConsumer struct {
	// contains filtered or unexported fields
}

Consumer abstracts hub-of-hubs/pkg/kafka kafka-consumer's generic usage.

func NewKafkaConsumer

func NewKafkaConsumer(kafkaConfig *transport.KafkaConfig, log logr.Logger,
) (*KafkaConsumer, error)

NewConsumer creates a new instance of Consumer.

func (*KafkaConsumer) BundleRegister

func (c *KafkaConsumer) BundleRegister(registration *registration.BundleRegistration)

Register function registers a msgID to the bundle updates channel.

func (*KafkaConsumer) Commit

func (c *KafkaConsumer) Commit(msg *kafka.Message) error

Commit commits a kafka message.

func (*KafkaConsumer) Consumer

func (c *KafkaConsumer) Consumer() *kafka.Consumer

Consumer returns the wrapped Confluent KafkaConsumer.

func (*KafkaConsumer) CustomBundleRegister

func (c *KafkaConsumer) CustomBundleRegister(msgID string,
	customBundleRegistration *registration.CustomBundleRegistration,
)

Register function registers a bundle ID to a CustomBundleRegistration.

func (*KafkaConsumer) GetGenericBundleChan

func (c *KafkaConsumer) GetGenericBundleChan() chan *bundle.GenericBundle

func (*KafkaConsumer) GetMessageChan

func (c *KafkaConsumer) GetMessageChan() chan *kafka.Message

func (*KafkaConsumer) SetCommitter

func (c *KafkaConsumer) SetCommitter(committer *committer)

func (*KafkaConsumer) SetConflationManager

func (c *KafkaConsumer) SetConflationManager(conflationMgr *conflator.ConflationManager)

func (*KafkaConsumer) SetLeafHubName

func (c *KafkaConsumer) SetLeafHubName(leafHubName string)

func (*KafkaConsumer) SetStatistics

func (c *KafkaConsumer) SetStatistics(statistics *statistics.Statistics)

func (*KafkaConsumer) Start

func (c *KafkaConsumer) Start(ctx context.Context) error

Start function starts the consumer.

func (*KafkaConsumer) Subscribe

func (c *KafkaConsumer) Subscribe(topic string) error

Subscribe subscribes consumer to the given topic.

func (*KafkaConsumer) SyncCustomBundle

func (c *KafkaConsumer) SyncCustomBundle(customBundleRegistration *registration.CustomBundleRegistration,
	payload []byte,
) error

SyncCustomBundle writes a custom bundle to its respective syncer channel.

type KafkaConsumerConfig

type KafkaConsumerConfig struct {
	ConsumerID    string
	ConsumerTopic string
}

type SaramaConsumer

type SaramaConsumer interface {
	Start(ctx context.Context) error
	MessageChan() chan *sarama.ConsumerMessage
	MarkOffset(topic string, partition int32, offset int64)
}

func NewSaramaConsumer

func NewSaramaConsumer(ctx context.Context, kafkaConfig *transport.KafkaConfig) (SaramaConsumer, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL