vodfka

package module
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2022 License: MIT Imports: 13 Imported by: 0

README

Vod-kakfa 🍸

Consummer producer wrapper and API a la Gin Gonic for kakfa

Getting Started

Import the module into your project

go get https://github.com/chefcookit/vod-kafka

  1. In you main() function, you should first configure your instance.
  2. Add your consumers
  3. The last line of code in your main() function should be vodfka.Run() which will keep you server running.
vodfka.With(Config{
		KafkaBroker: "localhost:29092",
		Retries: 3,
		Logger: logger, // your app logger
	})

// declare consumers ....
vodfka.Consume("my-topic", "my-consumer-group", Handler.HandleEvent)

// This should be the last line of code in the main() function the server
// is not running any other service (HttpServer etc)
vodfka.Run()

Consumer exemple

// Asssign handler function in main function
vodfka.Consume("my-topic", "my-consumer-group", Handler.HandleEvent)

// Implement handler function in its own package
func (hdl *Handler) HandleEvent(c *kafka.Context) error {
	var event domain.Event
	err := c.BindJSON(&event)
	if err != nil {
		logger.Error("Could not consume Event")
		return err
	}
    // ... business logic
	return nil
}

Producer


// Initialise in main function then inject in dependencies
kafkaProducer := vodfka.GetProducer()

// This should be implemented in service layer
topic := "my-event-started"
key := "my unique key"

event := Event{
    BatchId:     batchId,
    PackageDate: packageDate,
}

if err := kafkaProducer.SendJSON(topic, key, event); err != nil {
    // ... handle error
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Consume

func Consume(topic string, consumerGroup string, handler HandlerFunc)

Adds a handler to consume messages on the kafka topic.

func Default

func Default()

Initiate with default configuration. Not recommended

func Run added in v0.2.0

func Run()

Run server and keep process alive until OS interrupt. Will perform a graceful shutdown when received.

func With

func With(config Config)

Initiate with custom configurations

Types

type Config

type Config struct {
	KafkaBroker string
	Logger      *zap.SugaredLogger
	Retries     int
}

Configuration for running the app. Retries : number of times to retry handling a message consumed on topic before being dumped Logger: instance to be used by Vodfka KafkaBroker: kafka broker host and port ex: "localhost:9092"

type Context

type Context interface {
	BindJSON(interface{}) error
}

Consumer handler Context allows to retrieve information about the incoming messsages

type HandlerFunc

type HandlerFunc func(Context) error

Contains the business logic that will handle messages received by the topic consumer

type Producer

type Producer interface {
	SendJSON(topic string, key string, obj interface{}) error
}

Used to send messages to the kafka cluster

func GetProducer

func GetProducer() Producer

Get the singleton instance of Producer. Will initiate a new one if none exist

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL