kafka

package
v0.0.0-...-19f54a0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2022 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Utils to simplify consuming from Kafka topics using Sarama.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer interface {
	Consume(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
}

Consumer can freely consume messages, return Error if something goes wrong, nil otherwise.

type ConsumerChannel

type ConsumerChannel struct {
	// contains filtered or unexported fields
}

ConsumerChannel defines a bidirectional channel communication for consuming and acknowledging messages.

func NewConsumerChannel

func NewConsumerChannel() *ConsumerChannel

func (*ConsumerChannel) ConsumerAcknowledge

func (cc *ConsumerChannel) ConsumerAcknowledge()

func (*ConsumerChannel) ConsumerCancel

func (cc *ConsumerChannel) ConsumerCancel()

func (*ConsumerChannel) ConsumerWaitRead

func (cc *ConsumerChannel) ConsumerWaitRead() ([]byte, error)

func (*ConsumerChannel) ProducerWaitAcknowledge

func (cc *ConsumerChannel) ProducerWaitAcknowledge() (bool, error)

func (*ConsumerChannel) ProducerWrite

func (cc *ConsumerChannel) ProducerWrite(message []byte) error

func (*ConsumerChannel) WaitUntilFinished

func (cc *ConsumerChannel) WaitUntilFinished()

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

ConsumerGroup represents a Sarama consumer group consumer

func (*ConsumerGroup) Cleanup

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*ConsumerGroup) ConsumeClaim

func (c *ConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*ConsumerGroup) Setup

Setup is run at the beginning of a new session, before ConsumeClaim

func (*ConsumerGroup) WaitUntilReady

func (c *ConsumerGroup) WaitUntilReady()

type Listener

type Listener struct {
	// contains filtered or unexported fields
}

Listener allows a Consumer handling kafka messages without much hassle

func NewListener

func NewListener(config ListenerConfig, consumer Consumer) *Listener

func (*Listener) Listen

func (l *Listener) Listen()

type ListenerConfig

type ListenerConfig struct {
	BootstrapServer string
	Topics          []string
	GroupId         string
	Username        string
	Password        string
}

ListenerConfig defines the settings for a Listener to be able to connect to a Kafka Broker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL