kafka

package module
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 5, 2021 License: MIT Imports: 8 Imported by: 3

README

go-kafka

Go Reference

A library exposing an implementation of segmentio/kafka-go. You can easily get off the mark with the bare essential configurations for registering the publishers and subscribers with a Kafka cluster running.

What is Kafka?

Kafka is a framework implementation of a software bus using stream-processing. It provides messaging queue with an interactive interface. Once you have Kafka running any service written in any language can register itself as publisher or subscriber of a topic. It can also be considered as a sequential implementation when multiple services can interact among themselves without the need of a REST endpoint.

Learning outcomes

  • Configurations involved for creating kafka publisher and consumer.
  • Create kafka topic to build messaging queues.
  • Setting up a kafka cluster using confluent images for Zookeeper and Kafka for replication and balancing.
  • Functions as method argument to read the messages from the cluster.

Pre-requisites

  • You should have a kafka cluster running on your machine.
  • A Kafka Topic must be registered with the cluster.

How to implement

  • Runner.go demonstrates configuring the kafka client in a way. The configurations are imported using Flag Parser for assigning the idea to a variables.

  • You can import the package using go modules or any dependency management tool.

import (
    //...
	"github.com/atulanand206/go-kafka"
    //...
)
  • Set the topic and brokerUrl into a variable. I prefer setting the topic and broker id as environment variables.
kafkaTopic := os.Getenv("KAFKA_TOPIC")
kafkaBrokerId := os.Getenv("KAFKA_BROKER_ID")
  • Configure the Producer in your application runner method to initialize publishing to the kafka cluster.
kafka.LoadPublisher(kafkaBrokerId, kafkaTopic)
  • Call the Push method where you'd like to publish to the kafka topic.
formInBytes, _ := json.Marshal(jsonObject)
if err := kafka.Push(nil, formInBytes); err != nil {
    log.Panic(err)
}
  • Configure the Reader in your application runner method to listen to the topic in the kafka cluster.
kafka.LoadConsumer(kafkaBrokerId, kafkaTopic)
  • Call the Read method where you'd like to listen to the kafka topic.
kafka.Read(func(val string) {
    jsonObject := &object{}
    json.Unmarshal([]byte(val), &jsonObject)
}

Author

  • Atul Anand

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// Contains the broker url for the kafka cluster.
	KafkaBrokerUrl string

	// Contains the topic used for interacting with the broker.
	KafkaTopic string
)
View Source
var Reader *kafka.Reader

Instance variable Reader to be used for reading from a kafka topic. Must be configured before calling the Read method.

View Source
var Writer *kafka.Writer

Instance variable Writer to be used for writing to a kafka topic. Must be configured before calling the Push method.

Functions

func ConfigureReader

func ConfigureReader(kafkaBrokerUrls []string, clientId string, topic string)

Configures the Kafka reader for a given topic listening to the broker url.

func ConfigureWriter

func ConfigureWriter(kafkaBrokerUrls []string, clientId string, topic string)

Configures the Kafka writer for a given topic publishing to the broker url.

func LoadConsumer

func LoadConsumer(brokerId string, topic string)

Configures the Reader to register a subscriber for the the kafka broker and topic.

func LoadPublisher

func LoadPublisher(brokerId string, topic string)

Configures the Writer to register a publisher for the the kafka broker and topic.

func Push

func Push(key, value []byte) (err error)

Pushes to the configured Kafka topic as a message in the partition.

func Read

func Read(handler func(value string))

Listens to the configured Kafka topic and acts every time there is a message in the partition.

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL