kafkaclient

package module
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 27, 2021 License: MIT Imports: 8 Imported by: 0

README

Go Kafka Client Library Mit License Build Status Coverage Status

A high level Go client library for Apache Kafka that provides the following primitives on top of sarama-cluster:

  • Competing consumer semantics with dead letter queue (DLQ)
    • Ability to process messages across multiple goroutines
    • Ability to Ack or Nack messages out of order (with optional DLQ)
  • Ability to consume from topics spread across different kafka clusters

Stability

This library is in alpha. APIs are subject to change, use at your own risk

Contributing

If you are interested in contributing, please sign the License Agreement and see our development guide

Installation

go get -u github.com/uber-go/kafka-client

Quick Start

package main

import (
	"os"
	"os/signal"

	"github.com/uber-go/kafka-client"
	"github.com/uber-go/kafka-client/kafka"
	"github.com/uber-go/tally"
	"go.uber.org/zap"
)

func main() {
	// mapping from cluster name to list of broker ip addresses
	brokers := map[string][]string{
		"sample_cluster":     []string{"127.0.0.1:9092"},
		"sample_dlq_cluster": []string{"127.0.0.1:9092"},
	}
	// mapping from topic name to cluster that has that topic
	topicClusterAssignment := map[string][]string{
		"sample_topic": []string{"sample_cluster"},
	}

	// First create the kafkaclient, its the entry point for creating consumers or producers
	// It takes as input a name resolver that knows how to map topic names to broker ip addrs
	client := kafkaclient.New(kafka.NewStaticNameResolver(topicClusterAssignment, brokers), zap.NewNop(), tally.NoopScope)

	// Next, setup the consumer config for consuming from a set of topics
	config := &kafka.ConsumerConfig{
		TopicList: kafka.ConsumerTopicList{
			kafka.ConsumerTopic{ // Consumer Topic is a combination of topic + dead-letter-queue
				Topic: kafka.Topic{ // Each topic is a tuple of (name, clusterName)
					Name:    "sample_topic",
					Cluster: "sample_cluster",
				},
				DLQ: kafka.Topic{
					Name:    "sample_consumer_dlq",
					Cluster: "sample_dlq_cluster",
				},
			},
		},
		GroupName:   "sample_consumer",
		Concurrency: 100, // number of go routines processing messages in parallel
	}

	// Create the consumer through the previously created client
	consumer, err := client.NewConsumer(config)
	if err != nil {
		panic(err)
	}

	// Finally, start consuming
	if err := consumer.Start(); err != nil {
		panic(err)
	}

	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, os.Interrupt)

	for {
		select {
		case msg, ok := <-consumer.Messages():
			if !ok {
				return // channel closed
			}
			if err := process(msg); err != nil {
				msg.Nack()
			} else {
				msg.Ack()
			}
		case <-sigCh:
			consumer.Stop()
			<-consumer.Closed()
		}
	}
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client interface {
	// NewConsumer returns a new instance of kafka consumer.
	NewConsumer(config *kafka.ConsumerConfig, consumerOpts ...ConsumerOption) (kafka.Consumer, error)
}

Client refers to the kafka client. Serves as the entry point to producing or consuming messages from kafka

func New

func New(resolver kafka.NameResolver, logger *zap.Logger, scope tally.Scope) Client

New returns a new kafka client.

type ConsumerOption

type ConsumerOption interface {
	// contains filtered or unexported methods
}

ConsumerOption is the type for optional arguments to the NewConsumer constructor.

func WithAdditionalOptions added in v0.2.3

func WithAdditionalOptions(interval time.Duration) ConsumerOption

func WithClientID added in v0.2.2

func WithClientID(clientID string) ConsumerOption

WithClientID sets client id.

func WithDLQTopics added in v0.1.3

func WithDLQTopics(topicList kafka.ConsumerTopicList) ConsumerOption

WithDLQTopics creates a range consumer for the specified consumer DLQ topics.

func WithRetryTopics added in v0.1.3

func WithRetryTopics(topicList kafka.ConsumerTopicList) ConsumerOption

WithRetryTopics creates a consumer for the specified consumer Retry topics.

Directories

Path Synopsis
internal
consumer
Package consumer is a generated protocol buffer package.
Package consumer is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL