kafka

package module
v0.0.0-...-cdd6f54 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 17, 2021 License: GPL-3.0 Imports: 16 Imported by: 6

README

bwNetFlow Go Kafka Connector

This is a opinionated implementation of a common Connector module for all of our official components and optionally for users of our platform that intend to write client applications in Go. It provides an abstraction for plain Sarama and has support for consuming topics as well as producing to multiple topics, all while converting any message according to our protobuf definition for Flow messages (which is based on goflow's definition).

Build Status Go Report Card GoDoc

Example Usage in Consumer-only mode:

package main

import (
	"fmt"
	"os"
	"os/signal"
	"syscall"

	"github.com/Shopify/sarama"
	kafka "github.com/bwNetFlow/kafkaconnector"
)

var kafkaConn = kafka.Connector{}

func main() {
	fmt.Printf("welcome... let's go!\n")

	// prepare all variables
	broker := "127.0.0.1:9092,[::1]:9092" // TODO: set valid uris
	topic := []string{"flow-messages-anon"}
	consumerGroup := "anon-golang-example"
	kafkaConn.SetAuthAnon() // optionally: change to SetAuthFromEnv() or SetAuth(user string, pass string)

	kafkaConn.EnablePrometheus(":2112") // optionally open up for monitoring

	// ensure a clean exit
	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		<-sigchan
		fmt.Println("Signal caught, exiting...")
		kafkaConn.Close()
	}()

	// receive flows
	kafkaConn.StartConsumer(broker, topic, consumerGroup, sarama.OffsetNewest)
	var flowCounter, byteCounter uint64
	for flow := range kafkaConn.ConsumerChannel() {
		// process the flow here ...
		flowCounter++
		byteCounter += flow.GetBytes()
		fmt.Printf("\rflows: %d, bytes: %d GB", flowCounter, byteCounter/1024/1024/1024)
	}
}

Example Usage in Consumer/Producer mode:

Check out processor_splitter, it is very simple and consumes a single topic while producing to multiple target topics.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Connector

type Connector struct {
	// contains filtered or unexported fields
}

Connector handles a connection to read bwNetFlow flows from kafka.

func (*Connector) Close

func (connector *Connector) Close()

func (*Connector) ConsumerChannel

func (connector *Connector) ConsumerChannel() <-chan *flow.FlowMessage

Return the channel used for receiving Flows from the Kafka Consumer. If this channel closes, it means the upstream Kafka Consumer has closed its channel previously of the last decoding step. You can restart the Consumer by using .StartConsumer() on the same Connector object.

func (*Connector) DisableAuth

func (connector *Connector) DisableAuth()

DisableAuth disables authentification

func (*Connector) DisableTLS

func (connector *Connector) DisableTLS()

DisableTLS disables ssl/tls connection

func (*Connector) EnablePrometheus

func (connector *Connector) EnablePrometheus(listen string)

EnablePrometheus enables metric exporter for both, Consumer and Producer

func (*Connector) NewBaseConfig

func (connector *Connector) NewBaseConfig() *sarama.Config

EnablePrometheus enables metric exporter for both, Consumer and Producer

func (*Connector) ProducerChannel

func (connector *Connector) ProducerChannel(topic string) chan *flow.FlowMessage

Return the channel used for handing over Flows to the Kafka Producer. If writing to this channel blocks, check the log.

func (*Connector) SetAuth

func (connector *Connector) SetAuth(user string, pass string)

SetAuth explicitly set which login to use in SASL/PLAIN auth via TLS

func (*Connector) SetAuthAnon

func (connector *Connector) SetAuthAnon()

Set anonymous credentials as login method.

func (*Connector) SetAuthFromEnv

func (connector *Connector) SetAuthFromEnv() error

Check environment to infer which login to use in SASL/PLAIN auth via TLS Requires KAFKA_SASL_USER and KAFKA_SASL_PASS to be set for this process.

func (*Connector) StartConsumer

func (connector *Connector) StartConsumer(brokers string, topics []string, group string, offset int64) error

Start a Kafka Consumer with the specified parameters. Its output will be available in the channel returned by ConsumerChannel.

func (*Connector) StartProducer

func (connector *Connector) StartProducer(broker string) error

Start a Kafka Producer with the specified parameters. The channel returned by ProducerChannel will be accepting your input.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer represents a Sarama consumer group consumer

func (*Consumer) Cleanup

func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*Consumer) Close

func (consumer *Consumer) Close()

func (*Consumer) ConsumeClaim

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*Consumer) Setup

func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error

Setup is run at the beginning of a new session, before ConsumeClaim

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL