kafka

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 6, 2025 License: MIT Imports: 8 Imported by: 0

README

Things-Kit Kafka

Kafka Consumer Module for Things-Kit

Lifecycle-managed Kafka consumer implementation for Things-Kit applications.

Installation

go get github.com/things-kit/things-kit-kafka

Features

  • Automatic Kafka consumer lifecycle management
  • Implements the messaging.Consumer interface
  • Configuration via Viper or environment variables
  • Graceful shutdown
  • Message handler registration via dependency injection

Quick Start

package main

import (
    "context"
    "github.com/things-kit/things-kit/app"
    "github.com/things-kit/things-kit/logging"
    "github.com/things-kit/things-kit/viperconfig"
    "github.com/things-kit/things-kit-kafka"
    "github.com/things-kit/things-kit-messaging"
)

func main() {
    app.New(
        viperconfig.Module,
        logging.Module,
        kafka.ConsumerModule,
        kafka.AsMessageHandler(NewUserEventHandler),
    ).Run()
}

type UserEventHandler struct {
    // your dependencies
}

func NewUserEventHandler() *UserEventHandler {
    return &UserEventHandler{}
}

func (h *UserEventHandler) Handle(ctx context.Context, msg messaging.Message) error {
    // process message
    return nil
}

func (h *UserEventHandler) Topic() string {
    return "user-events"
}

Configuration

Via config.yaml:

kafka:
  brokers:
    - localhost:9092
  group_id: my-consumer-group
  auto_offset_reset: earliest

Or environment variables:

export KAFKA_BROKERS=localhost:9092,broker2:9092
export KAFKA_GROUP_ID=my-consumer-group
export KAFKA_AUTO_OFFSET_RESET=earliest

License

MIT License - see LICENSE file for details

Documentation

Overview

Package kafka provides Kafka consumer implementation for Things-Kit applications. It implements the messaging.Consumer interface.

Index

Constants

This section is empty.

Variables

View Source
var ConsumerModule = fx.Module("kafka-consumer",
	fx.Provide(
		NewConfig,
		NewKafkaConsumer,

		fx.Annotate(
			func(c *KafkaConsumer) messaging.Consumer { return c },
			fx.As(new(messaging.Consumer)),
		),
	),
	fx.Invoke(RunConsumer),
)

ConsumerModule provides the Kafka consumer module to the application.

Functions

func RunConsumer

func RunConsumer(p ConsumerParams, consumer *KafkaConsumer)

RunConsumer starts the Kafka consumer with lifecycle management.

Types

type Config

type Config struct {
	Brokers  []string      `mapstructure:"brokers"`
	Topic    string        `mapstructure:"topic"`
	GroupID  string        `mapstructure:"group_id"`
	MaxWait  time.Duration `mapstructure:"max_wait"`
	MinBytes int           `mapstructure:"min_bytes"`
	MaxBytes int           `mapstructure:"max_bytes"`
}

Config holds the Kafka consumer configuration.

func NewConfig

func NewConfig(v *viper.Viper) *Config

NewConfig creates a new Kafka configuration from Viper.

type ConsumerParams

type ConsumerParams struct {
	fx.In
	Lifecycle fx.Lifecycle
	Logger    log.Logger
	Config    *Config
	Handler   messaging.Handler
}

ConsumerParams contains all dependencies needed to run the Kafka consumer.

type KafkaConsumer

type KafkaConsumer struct {
	// contains filtered or unexported fields
}

KafkaConsumer implements the messaging.Consumer interface using Kafka.

func NewKafkaConsumer

func NewKafkaConsumer(cfg *Config, handler messaging.Handler, logger log.Logger) *KafkaConsumer

NewKafkaConsumer creates a new Kafka consumer.

func (*KafkaConsumer) Start

func (c *KafkaConsumer) Start(ctx context.Context) error

Start begins consuming messages from Kafka.

func (*KafkaConsumer) Stop

func (c *KafkaConsumer) Stop(ctx context.Context) error

Stop gracefully shuts down the Kafka consumer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL