kafka

package module
v2.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 28, 2021 License: MIT Imports: 10 Imported by: 0

README

kafka-client-go

CircleCI Go Report Card Coverage Status

Description

Library for producing and consuming messages directly from Kafka.

Only perseverant connections (for both producer & consumer) are provided i.e. establishing connections to Kafka is retried until successful.

Attempting to consume or produce messages or to check connectivity will fail if connection hasn't been established yet.

The library is NOT using Zookeeper to connect to Kafka under the hood.

Usage

Importing:

    import "github.com/Financial-Times/kafka-client-go/v2"
Producer

Creating a producer:

    config := kafka.ProducerConfig{
        BrokersConnectionString: "", // Comma-separated list of Kafka brokers
        Topic:                   "", // Topic to publish to
    }

    initialDelay := time.Second // Duration to await before trying to establish connection
    retryInterval := time.Second // Duration between each retry for establishing connection

    logger := logger.NewUPPLogger(...)
    
    producer := kafka.NewProducer(config, logger, initialDelay, retryInterval)

The connection to Kafka is started in a separate go routine when creating the producer.

Sending a message:

    headers := map[string]string{}
    body := ""
    message := kafka.NewFTMessage(headers, body)
    
    err := producer.SendMessage(message)
    // Error handling

Connection should be closed by the client:

    err := producer.Close()
    // Error handling
Consumer

Creating a consumer:

    config := kafka.ConsumerConfig{
        BrokersConnectionString: "", // Comma-separated list of Kafka brokers
        ConsumerGroup:           "", // Unique name of a consumer group
        Topics:                  []string{}, // Comma-separated list of topics to consume from
    }
    
    retryInterval := time.Second // Duration between each retry for establishing connection
    
    logger := logger.NewUPPLogger(...)

    consumer := kafka.NewConsumer(config, logger, retryInterval)

Consuming messages:

Consumer groups are lazily initialized i.e. establishing connection to Kafka is done within StartListening().

    handler := func(message kafka.FTMessage) {
        // Message handling
    }
    
    go consumer.StartListening(handler) // Blocking until connection is established

Connections should be closed by the client:

    err := consumer.Close()
    // Error handling

Testing

    go test --race -v ./...

NB: Some tests in this project require a local Kafka (port 29092). Use the -short flag in order to omit those.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultConsumerOptions

func DefaultConsumerOptions() *sarama.Config

DefaultConsumerOptions returns a new sarama configuration with predefined default settings.

func DefaultProducerOptions

func DefaultProducerOptions() *sarama.Config

DefaultProducerOptions creates a new Sarama producer configuration with default values.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer which will keep trying to reconnect to Kafka on a specified interval. The underlying consumer group is created lazily when message listening is started.

func NewConsumer

func NewConsumer(config ConsumerConfig, log *logger.UPPLogger, retryInterval time.Duration) *Consumer

func (*Consumer) Close

func (c *Consumer) Close() error

Close closes the consumer connection to Kafka if the consumer is connected.

func (*Consumer) ConnectivityCheck

func (c *Consumer) ConnectivityCheck() error

ConnectivityCheck checks whether a connection to Kafka can be established.

func (*Consumer) StartListening

func (c *Consumer) StartListening(messageHandler func(message FTMessage))

StartListening is a blocking call that tries to establish a connection to Kafka and then starts listening.

type ConsumerConfig

type ConsumerConfig struct {
	BrokersConnectionString string
	ConsumerGroup           string
	Topics                  []string
	Options                 *sarama.Config
}

type FTMessage

type FTMessage struct {
	Headers map[string]string
	Body    string
}

func NewFTMessage

func NewFTMessage(headers map[string]string, body string) FTMessage

func (*FTMessage) Build

func (m *FTMessage) Build() string

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer which will keep trying to reconnect to Kafka on a specified interval. The underlying producer is created in a separate go-routine when the Producer is initialized.

func NewProducer

func NewProducer(config ProducerConfig, logger *logger.UPPLogger, initialDelay, retryInterval time.Duration) *Producer

func (*Producer) Close

func (p *Producer) Close() error

Close closes the connection to Kafka if the producer is connected.

func (*Producer) ConnectivityCheck

func (p *Producer) ConnectivityCheck() error

ConnectivityCheck checks whether a connection to Kafka can be established.

func (*Producer) SendMessage

func (p *Producer) SendMessage(message FTMessage) error

SendMessage checks if the producer is connected, then sends a message to Kafka.

type ProducerConfig

type ProducerConfig struct {
	BrokersConnectionString string
	Topic                   string
	Options                 *sarama.Config
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL