kafka

package module
v1.3.83 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2026 License: Apache-2.0 Imports: 6 Imported by: 0

README

Go Reference Go Report Card

Kafka Utilities

The kafka package provides utilities to interact with Kafka, including configuration management, message production, and message consumption. It simplifies the process of integrating Kafka into your Go applications.

Features

  • Load Kafka configuration from environment variables.
  • Validate Kafka configuration.
  • Initialize Kafka readers and writers with SASL authentication.
  • Support for partitioned and grouped message consumption.

Configuration

The Kafka configuration is loaded using environment variables. The following variables are supported:

Environment Variable Description Required Default Value
KAFKA_ADDRESS Kafka broker address Yes -
KAFKA_TOPIC Kafka topic name Yes -
KAFKA_USERNAME SASL username Yes -
KAFKA_PASSWORD SASL password Yes -
KAFKA_GROUPID Consumer group ID No default-group
KAFKA_PARTITION Partition number (numeric) No 0

Usage

Loading Configuration

Use the LoadConfig function to load and validate Kafka configuration:

config, err := kafka.LoadConfig()
if err != nil {
    log.Fatalf("Failed to load Kafka config: %v", err.Error())
}
Initializing a Kafka Reader

Use the InitializeKafkaReader function to create a Kafka reader:

reader, err := kafka.InitializeKafkaReader(config)
if err != nil {
    log.Fatalf("Failed to initialize Kafka reader: %v", err.Error())
}
defer reader.Close()
Initializing a Kafka Writer

Use the InitializeKafkaWriter function to create a Kafka writer:

writer, err := kafka.InitializeKafkaWriter(config)
if err != nil {
    log.Fatalf("Failed to initialize Kafka writer: %v", err.Error())
}
defer writer.Close()

Example

Below is an example of producing and consuming messages using the kafka package:

package main

import (
	"context"
	"log"
	"github.com/MyCarrier-DevOps/goLibMyCarrier/kafka"
)

func main() {
	// Load Kafka configuration
	config, err := kafka.LoadConfig()
	if err != nil {
		log.Fatalf("Failed to load Kafka config: %v", err.Error())
	}

	// Initialize Kafka writer
	writer, err := kafka.InitializeKafkaWriter(config)
	if err != nil {
		log.Fatalf("Failed to initialize Kafka writer: %v", err.Error())
	}
	defer writer.Close()

	// Produce a message
	err = writer.WriteMessages(context.Background(), kafka.Message{
		Value: []byte("Hello, Kafka!"),
	})
	if err != nil {
		log.Fatalf("Failed to write message: %v", err.Error())
	}

	// Initialize Kafka reader
	reader, err := kafka.InitializeKafkaReader(config)
	if err != nil {
		log.Fatalf("Failed to initialize Kafka reader: %v", err.Error())
	}
	defer reader.Close()

	// Consume a message
	msg, err := reader.ReadMessage(context.Background())
	if err != nil {
		log.Fatalf("Failed to read message: %v", err.Error())
	}
	log.Printf("Received message: %s", string(msg.Value))
}

Notes

  • Ensure that the Kafka broker supports SASL authentication if using KAFKA_USERNAME and KAFKA_PASSWORD.
  • The KAFKA_PARTITION variable is only used when KAFKA_GROUPID is not set.

For more details, refer to the Segment Kafka Go documentation.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func InitializeKafkaReader

func InitializeKafkaReader(kafkacfg *KafkaConfig, separator string, suffix ...string) (*kafka.Reader, error)

InitializeKafkaReader initializes a Kafka reader with the provided configuration.

func InitializeKafkaWriter

func InitializeKafkaWriter(kafkacfg *KafkaConfig, separator string, suffix ...string) (*kafka.Writer, error)

InitializeKafkaWriter initializes a Kafka writer with the provided configuration and optional topic suffix. If KAFKA_TOPIC is set, the suffix is ignored and the topic is used directly. If KAFKA_TOPIC_PREFIX is set and KAFKA_TOPIC is not set, the topic will be KAFKA_TOPIC_PREFIX + "." + suffix. The suffix parameter is optional - pass an empty string or omit it for backward compatibility.

Types

type KafkaConfig

type KafkaConfig struct {
	Address            string `mapstructure:"address"`
	Topic              string `mapstructure:"topic"`
	TopicPrefix        string `mapstructure:"topic_prefix"`
	Username           string `mapstructure:"username"`
	Password           string `mapstructure:"password"`
	GroupID            string `mapstructure:"groupid"`
	Partition          string `mapstructure:"partition"`
	InsecureSkipVerify string `mapstructure:"insecure_skip_verify"`
	// StartOffset determines where a new consumer group starts reading.
	// Valid values: "first" (default, oldest messages) or "last" (newest messages only).
	// Only applies when GroupID is set and the consumer group has no committed offsets.
	StartOffset string `mapstructure:"start_offset"`
}

KafkaConfig represents the configuration for Kafka.

func LoadConfig

func LoadConfig() (*KafkaConfig, error)

LoadConfig loads the configuration from environment variables using Viper. An isolated viper instance is used to prevent cross-package state pollution.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL