Documentation ¶
Overview ¶
Package kafka contains all the components required to consume and produce Kafka messages.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func NewConsumer ¶
func NewConsumer( bootstrapServer, groupID, topic string, incrementalRebalance bool, sessionTimeout, pollTimeout time.Duration, msgPool *bytebufferpool.Pool, metrics consumerMetrics, logger zerolog.Logger, ) *Consumer
NewConsumer creates an instance of a kafka.Consumer configured with the provided values.
func (*Consumer) Start ¶
func (c *Consumer) Start( ctx context.Context, msgChan chan<- *bytebufferpool.ByteBuffer, ) error
Start starts the consumption of messages from Kafka and forwards the values to the provided channel.
If the context is cancelled, it pauses consumption and closes the channel to signal to the downstream receivers to shutdown as well.
This should be run in a Goroutine, preferably using the golang.org/x/sync/errgroup for grouping all the concurrent Goroutines.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func NewProducer ¶
func NewProducer( bootstrapServer, topic string, flushInterval, msgTimeout, fullQueueCooldown time.Duration, msgRetries int, logDeliveryReports bool, msgPool *bytebufferpool.Pool, metrics producerMetrics, logger zerolog.Logger, ) *Producer
NewProducer creates an instance of a kafka.Producer configured with the provided values.
func (*Producer) Start ¶
func (p *Producer) Start(ctx context.Context, msgChan <-chan *bytebufferpool.ByteBuffer) error
Start starts the production of Kafka messages with the values from the provided channel.
If the inbound channel is closed the production stops.
This should be run in a Goroutine, preferably using the golang.org/g/sync/errgroup for grouping all concurrent Goroutines.