kafkaclient

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 21, 2020 License: MIT Imports: 10 Imported by: 0

README

Go Kafka Client Library Mit License Actions Status codecov

Forked from: https://github.com/uber-go/kafka-client

A high level Go client library for Apache Kafka that provides the following primitives on top of sarama-cluster:

  • Competing consumer semantics with dead letter queue (DLQ)
    • Ability to process messages across multiple goroutines
    • Ability to Ack or Nack messages out of order (with optional DLQ)
  • Ability to consume from topics spread across different kafka clusters

Stability

This library is now stable. Uber has archived their version of it. I have forked it to provide some additinal functionality for a few projects that use it.

Installation

go get -u github.com/AngerM/kafka-client

Quick Start

package main

import (
	"os"
	"os/signal"

	"github.com/AngerM/kafka-client"
	"github.com/AngerM/kafka-client/kafka"
	"github.com/uber-go/tally"
	"go.uber.org/zap"
)

func main() {
	// mapping from cluster name to list of broker ip addresses
	brokers := map[string][]string{
		"sample_cluster":     []string{"127.0.0.1:9092"},
		"sample_dlq_cluster": []string{"127.0.0.1:9092"},
	}
	// mapping from topic name to cluster that has that topic
	topicClusterAssignment := map[string][]string{
		"sample_topic": []string{"sample_cluster"},
	}

	// First create the kafkaclient, its the entry point for creating consumers or producers
	// It takes as input a name resolver that knows how to map topic names to broker ip addrs
	client := kafkaclient.New(kafka.NewStaticNameResolver(topicClusterAssignment, brokers), zap.NewNop(), tally.NoopScope)

	// Next, setup the consumer config for consuming from a set of topics
	config := &kafka.ConsumerConfig{
		TopicList: kafka.ConsumerTopicList{
			kafka.ConsumerTopic{ // Consumer Topic is a combination of topic + dead-letter-queue
				Topic: kafka.Topic{ // Each topic is a tuple of (name, clusterName)
					Name:    "sample_topic",
					Cluster: "sample_cluster",
				},
				DLQ: kafka.Topic{
					Name:    "sample_consumer_dlq",
					Cluster: "sample_dlq_cluster",
				},
			},
		},
		GroupName:   "sample_consumer",
		Concurrency: 100, // number of go routines processing messages in parallel
	}

	// Create the consumer through the previously created client
	consumer, err := client.NewConsumer(config)
	if err != nil {
		panic(err)
	}

	// Finally, start consuming
	if err := consumer.Start(); err != nil {
		panic(err)
	}

	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, os.Interrupt)

	for {
		select {
		case msg, ok := <-consumer.Messages():
			if !ok {
				return // channel closed
			}
			if err := process(msg); err != nil {
				msg.Nack()
			} else {
				msg.Ack()
			}
		case <-sigCh:
			consumer.Stop()
			<-consumer.Closed()
		}
	}
}

Documentation

Index

Constants

This section is empty.

Variables

SHA256 exporting for SASL based auth

Functions

This section is empty.

Types

type Client

type Client interface {
	// NewConsumer returns a new instance of kafka consumer.
	NewConsumer(config *kafka.ConsumerConfig, consumerOpts ...ConsumerOption) (kafka.Consumer, error)
}

Client refers to the kafka client. Serves as the entry point to producing or consuming messages from kafka

func New

func New(resolver kafka.NameResolver, logger *zap.Logger, scope tally.Scope) Client

New returns a new kafka client.

type ConsumerOption

type ConsumerOption interface {
	// contains filtered or unexported methods
}

ConsumerOption is the type for optional arguments to the NewConsumer constructor.

func WithClientID

func WithClientID(clientID string) ConsumerOption

WithClientID sets client id.

func WithDLQTopics

func WithDLQTopics(topicList kafka.ConsumerTopicList) ConsumerOption

WithDLQTopics creates a range consumer for the specified consumer DLQ topics.

func WithRetryTopics

func WithRetryTopics(topicList kafka.ConsumerTopicList) ConsumerOption

WithRetryTopics creates a consumer for the specified consumer Retry topics.

type ScramClient added in v1.0.0

type ScramClient struct {
	*scram.Client
	*scram.ClientConversation
	scram.HashGeneratorFcn
}

ScramClient is used for SASL based auth This is mainly a wrapper to satisfy sarama

func (*ScramClient) Begin added in v1.0.0

func (x *ScramClient) Begin(userName, password, authzID string) (err error)

Begin is used for SASL

func (*ScramClient) Done added in v1.0.0

func (x *ScramClient) Done() bool

Done is used for SASL

func (*ScramClient) Step added in v1.0.0

func (x *ScramClient) Step(challenge string) (response string, err error)

Step is used for SASL

Directories

Path Synopsis
internal
consumer
Package consumer is a generated protocol buffer package.
Package consumer is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL