kafka

package
v1.8.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2026 License: MIT Imports: 13 Imported by: 0

README

kafka

Wrapper around github.com/twmb/franz-go (kgo) that provides three lifecycle-conformant types:

  • Client — base wrapper, for callers that build their own producer/consumer logic.
  • Producer — synchronous single-record producer (Resource).
  • Consumer — manual-commit consumer group host (Runner).

Plus a generic JSONProducer[T] helper for encoding typed payloads.

Wraps github.com/twmb/franz-go (kgo).

When to use

Any service that produces or consumes Kafka messages.

Quickstart

package main

import (
    "context"
    "log"

    "github.com/sergeyslonimsky/core/app"
    "github.com/sergeyslonimsky/core/kafka"
)

type orderProcessor struct{}

func (orderProcessor) Process(ctx context.Context, msg kafka.Message) error {
    log.Printf("got %s/%s: %s", msg.Topic, msg.Key, msg.Message)
    return nil
}

func main() {
    ctx := context.Background()

    producer, err := kafka.NewProducer(kafka.ProducerConfig{
        Brokers: []string{"kafka:9092"},
        Topic:   "orders",
    }, kafka.WithOtel())
    if err != nil { log.Fatal(err) }

    consumer, err := kafka.NewConsumer(kafka.ConsumerConfig{
        Brokers: []string{"kafka:9092"},
        Group:   "order-handler",
        Topics:  []string{"orders"},
        Offset:  "newest",
    }, orderProcessor{}, kafka.WithOtel())
    if err != nil { log.Fatal(err) }

    a := app.New()
    a.Add(producer)   // Resource
    a.Add(consumer)   // Runner
    log.Fatal(a.Run())
}

Configuration

type ClientConfig struct {
    Brokers  []string
    ClientID string
}

type ConsumerConfig struct {
    Brokers []string
    Group   string
    Offset  string  // "" / "newest" / "oldest"
    Topics  []string
}

type ProducerConfig struct {
    Brokers []string
    Topic   string  // informational; Produce takes the topic per call
}

App-level mapping example:

kafka.ProducerConfig{
    Brokers: raw.GetStringSlice("kafka.brokers"),
    Topic:   raw.GetString("kafka.producers.orders.topic"),
}

Brokers is []string. Use raw.GetStringSlice or split your own string.

Options

Same Option type works with NewClient, NewProducer, NewConsumer — irrelevant fields are silently ignored:

  • WithLogger(*slog.Logger) — used for lifecycle events.
  • WithOtel() — wires kotel hooks for traces and meter metrics.
  • WithKgoOpts(...kgo.Opt) — escape hatch for any kgo feature without a typed wrapper (custom partitioner, SASL, TLS, etc).
  • WithProduceTimeout(d time.Duration) — applies to NewProducer only. Default: 30s.
  • WithErrorHandler(h ErrorHandler) — applies to NewConsumer only. Controls what happens when MessageProcessor.Process returns an error. Default (no handler): log and skip (the offset advances past the failing record). See "Consumer error handling" below.

Consumer error handling

By default, a failing Process call is logged and the record is committed — the consumer keeps moving. That is fine for poison-pill-tolerant pipelines, but risky when a transient downstream outage would silently discard messages.

WithErrorHandler lets you inject a decision function that returns one of:

  • kafka.ErrorActionSkip — log + commit the record (default behavior).
  • kafka.ErrorActionRetry — do NOT commit. The record stays uncommitted and is eligible for redelivery on the next rebalance or consumer restart. For in-process retry the handler should sleep/backoff before returning.
  • kafka.ErrorActionStop — terminate the consumer: Run returns the wrapped error, which propagates up to app.App and triggers service shutdown.

Example — DLQ on first failure, fail-fast on dependency outage:

consumer, err := kafka.NewConsumer(cfg, processor,
    kafka.WithOtel(),
    kafka.WithErrorHandler(func(ctx context.Context, msg kafka.Message, err error) kafka.ErrorAction {
        if errors.Is(err, ErrDependencyDown) {
            return kafka.ErrorActionStop
        }
        _ = dlq.Produce(msg.Topic, msg.Key, msg.Message)
        return kafka.ErrorActionSkip
    }),
)

Observability (WithOtel)

Constructs a kotel.Kotel with the global tracer/meter providers and attaches its hooks via kgo.WithHooks:

  • One span per produce.
  • One span per consume batch.
  • Per-broker meter metrics (latency, throughput).

Imports github.com/twmb/franz-go/plugin/kotel. Call otel.Setup and register the provider with app.App before constructing the kafka components.

Lifecycle

  • Client, Producerlifecycle.Resource. Register with app.App.Add(...). Shutdown closes the underlying *kgo.Client (blocking until in-flight produces drain).
  • Consumerlifecycle.Runner. Register with app.App.Add(...). Run polls until ctx is cancelled or Shutdown is called.
  • Client.Healthcheck — returns nil if the underlying client has discovered at least one broker.

Recommended ordering inside app.App:

a.Add(otelProvider)        // Resource, registered first → shuts down last
a.Add(kafkaProducer)       // Resource
a.Add(kafkaConsumer)       // Runner

Generic JSON producer

type Order struct { ID int; Total float64 }

orderProducer := kafka.NewJSONProducer[Order]("orders", "default-key", producer)
err := orderProducer.Produce(ctx, Order{ID: 1, Total: 99.95})

NewJSONProducer accepts any kafka.Publisher — typically *kafka.Producer, but any custom type implementing Produce(topic, key string, msg []byte) error works (handy for fakes in tests).

Extending

raw := client.Unwrap()  // *kgo.Client
admin := kadm.NewClient(raw)  // for admin operations

Testing

Unit tests use the included MockPublisher for JSONProducer.

Integration tests live in internal/integration/.

See also

  • core/app — register your kafka components here.
  • core/lifecycle — the Resource / Runner / Healthchecker contracts.
  • core/otel — bootstrap that powers WithOtel.

Documentation

Overview

Package kafka wraps github.com/twmb/franz-go (kgo) with the lifecycle contract used across core: typed Configs, functional Options, Shutdown and Healthcheck methods, and an opt-in OpenTelemetry instrumentation pipeline via the kotel plugin.

The package exposes three primary types:

  • Client (Resource) — base kgo.Client wrapper for callers that build their own producer/consumer logic.
  • Producer (Resource) — synchronous single-record producer.
  • Consumer (Runner) — consumer group with manual-commit semantics.

All three implement lifecycle.Resource (and Consumer additionally implements lifecycle.Runner), so they can be registered with app.App.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrEmptyBrokersList is returned when ClientConfig/ConsumerConfig/
	// ProducerConfig is constructed without any seed brokers.
	ErrEmptyBrokersList = errors.New("brokers list cannot be empty")

	// ErrNoBrokersDiscovered is returned by Client.Healthcheck when the
	// underlying kgo.Client has not discovered any brokers yet (the seed
	// brokers were unreachable, or no metadata fetch has completed).
	ErrNoBrokersDiscovered = errors.New("no brokers discovered")

	// ErrInvalidOffset is returned by NewConsumer when ConsumerConfig.Offset
	// is set to a value other than "" / "newest" / "oldest".
	ErrInvalidOffset = errors.New("invalid offset")

	// ErrNilProcessor is returned by NewConsumer when the MessageProcessor
	// argument is nil.
	ErrNilProcessor = errors.New("processor cannot be nil")

	// ErrNilConsumerGroup is returned by NewConsumer when ConsumerConfig.Group
	// is empty.
	ErrNilConsumerGroup = errors.New("consumer group cannot be empty")

	// ErrEmptyTopic is returned by NewConsumer when ConsumerConfig.Topics
	// is empty.
	ErrEmptyTopic = errors.New("topics list cannot be empty")
)

Sentinel errors returned by kafka constructors and methods. Stable identity so callers can errors.Is them.

Functions

This section is empty.

Types

type Client added in v1.3.0

type Client struct {
	// contains filtered or unexported fields
}

Client is a base wrapper around *kgo.Client. Use this when you need to build your own producer or consumer logic on top of franz-go primitives. For ready-made producers and consumers, see Producer and Consumer.

Implements lifecycle.Resource and lifecycle.Healthchecker.

func NewClient added in v1.3.0

func NewClient(_ context.Context, cfg ClientConfig, opts ...Option) (*Client, error)

NewClient creates a base franz-go client.

func (*Client) Healthcheck added in v1.3.0

func (c *Client) Healthcheck(_ context.Context) error

Healthcheck verifies that the client has discovered at least one broker. Implements lifecycle.Healthchecker.

func (*Client) Shutdown added in v1.3.0

func (c *Client) Shutdown(_ context.Context) error

Shutdown closes the underlying client. Implements lifecycle.Resource.

Idempotent and concurrent-safe: the close runs exactly once.

kgo.Client.Close blocks until in-flight produces drain. The ctx is currently advisory — kgo does not honor it directly — but the signature matches lifecycle.Resource for uniform integration with app.App.

func (*Client) Unwrap added in v1.3.0

func (c *Client) Unwrap() *kgo.Client

Unwrap returns the underlying *kgo.Client. Use for advanced operations (admin, transactions, custom hooks). Owned by Client — don't Close it directly; always go through Shutdown.

type ClientConfig added in v1.3.0

type ClientConfig struct {
	// Brokers is the seed broker list. Required.
	Brokers []string

	// ClientID is the kgo.ClientID. Optional.
	ClientID string
}

ClientConfig describes the seed brokers and client identity for a Client.

type Consumer added in v1.3.0

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is a kafka consumer-group host backed by a single franz-go client. Implements lifecycle.Runner — register with app.App.

Uses manual commit (DisableAutoCommit + MarkCommitRecords + CommitMarkedOffsets) so the offset-advance decision is driven by the configured ErrorHandler rather than by franz-go's auto-commit.

Default error policy (no WithErrorHandler): log and skip failing records. See WithErrorHandler + ErrorAction for fine-grained control (DLQ, retry, fail-fast).

func NewConsumer added in v1.3.0

func NewConsumer(
	cfg ConsumerConfig,
	processor MessageProcessor,
	opts ...Option,
) (*Consumer, error)

NewConsumer creates a consumer-group client.

cfg.Brokers, cfg.Group, and cfg.Topics are required; cfg.Offset defaults to "newest" when empty (other valid value: "oldest").

func (*Consumer) Run added in v1.3.0

func (c *Consumer) Run(ctx context.Context) error

Run blocks polling for messages until ctx is cancelled (graceful) or the configured ErrorHandler returns ErrorActionStop (fatal).

Returns nil on ctx cancellation. Returns a wrapping error when ErrorActionStop is selected by the handler.

At-least-once semantics: on ErrorActionStop, records that were processed successfully earlier in the same fetch batch are NOT committed — they will be redelivered on restart. Processors must be idempotent to handle this (as they must for any Kafka consumer-group restart).

Implements lifecycle.Runner.

func (*Consumer) Shutdown added in v1.3.0

func (c *Consumer) Shutdown(_ context.Context) error

Shutdown closes the underlying client, which unblocks PollFetches in Run and drains in-flight fetches. Safe to call multiple times. Implements lifecycle.Resource.

The ctx is currently advisory — kgo.Client.Close does not honor it.

type ConsumerConfig

type ConsumerConfig struct {
	// Brokers is the seed broker list.
	Brokers []string

	// Group is the consumer group ID. Required.
	Group string

	// Offset selects the initial offset on first read of a partition:
	// "newest" / "" (default) or "oldest".
	Offset string

	// Topics is the list of topics to subscribe to. Required.
	Topics []string
}

ConsumerConfig describes a consumer group's connection parameters. Plain fields, no struct tags — consumer apps map their viper keys to fields explicitly inside their own config.NewConfig().

type ContextPublisher added in v1.3.0

type ContextPublisher[T any] interface {
	Produce(ctx context.Context, message T) error
}

ContextPublisher is a typed publisher (e.g., JSONProducer[T]) that takes a typed message and a context.

type ErrorAction added in v1.3.0

type ErrorAction int

ErrorAction selects what the Consumer should do after MessageProcessor.Process returns a non-nil error. Passed back from a caller-supplied ErrorHandler.

const (
	// ErrorActionSkip logs the error and advances the consumer-group offset
	// past the failing record. Use for poison-pill tolerant pipelines where
	// DLQ is handled by the processor itself.
	//
	// This is the default when no ErrorHandler is configured, preserving the
	// pre-existing behavior.
	ErrorActionSkip ErrorAction = iota

	// ErrorActionRetry does NOT commit the record. The next PollFetches may
	// redeliver it (after a rebalance or client restart). For in-process
	// retry, the caller's ErrorHandler should sleep/backoff before returning.
	//
	// Note: kgo does not re-fetch uncommitted records within the same
	// session automatically — retry here effectively means "let the next
	// rebalance replay it". For tight retries the processor should loop
	// internally and only return Retry after exhausting its budget.
	ErrorActionRetry

	// ErrorActionStop terminates the Consumer: Run returns the original
	// processing error wrapped. The record is NOT committed. Use for fatal
	// conditions where the service should not advance past the failure
	// (e.g., dependency outage that must be surfaced to the operator).
	ErrorActionStop
)

type ErrorHandler added in v1.3.0

type ErrorHandler func(ctx context.Context, msg Message, err error) ErrorAction

ErrorHandler is invoked when MessageProcessor.Process returns a non-nil error. It decides what the consumer should do with the failing record.

Implementations typically: emit to a DLQ and return ErrorActionSkip; increment a metric and return ErrorActionSkip; or on fatal errors return ErrorActionStop.

type JSONProducer added in v1.3.0

type JSONProducer[T any] struct {
	// contains filtered or unexported fields
}

JSONProducer wraps a Publisher and JSON-encodes T messages on every call. Generic over T to give compile-time payload-type safety.

func NewJSONProducer added in v1.3.0

func NewJSONProducer[T any](topic, defaultKey string, producer Publisher) *JSONProducer[T]

NewJSONProducer wraps the given Publisher (typically *Producer) and always publishes to the configured topic.

func (*JSONProducer[T]) Produce added in v1.3.0

func (p *JSONProducer[T]) Produce(ctx context.Context, message T) error

Produce encodes message as JSON and sends it with the constructor's defaultKey.

func (*JSONProducer[T]) ProduceWithKey added in v1.3.0

func (p *JSONProducer[T]) ProduceWithKey(_ context.Context, key string, message T) error

ProduceWithKey encodes message as JSON and sends it with the given key.

type Message added in v1.3.0

type Message struct {
	Topic   Topic
	Key     []byte
	Message []byte
}

Message is the per-record payload delivered to MessageProcessor.Process. The Key and Message byte slices are reused by the underlying fetch loop after Process returns — copy them if you need to retain.

type MessageProcessor added in v1.3.0

type MessageProcessor interface {
	Process(ctx context.Context, message Message) error
}

MessageProcessor is what callers implement to handle messages delivered by a Consumer. Process returns nil on success; a non-nil error is logged but does NOT halt the consumer.

type Option added in v1.3.0

type Option func(*options)

Option configures any of the kafka constructors (NewClient, NewProducer, NewConsumer). Not every option applies to every constructor — irrelevant fields are silently ignored.

func WithErrorHandler added in v1.3.0

func WithErrorHandler(h ErrorHandler) Option

WithErrorHandler configures how NewConsumer reacts to a MessageProcessor error. Without this option, the Consumer logs and skips failing records (ErrorActionSkip) — which silently advances the consumer-group offset.

Typical usages:

  • Return ErrorActionSkip after pushing the payload to a DLQ topic.
  • Return ErrorActionStop for unrecoverable dependency outages so the service fails loudly instead of losing messages.
  • Return ErrorActionRetry to leave the record uncommitted (next rebalance redelivers it).

Ignored by NewClient and NewProducer.

func WithKgoOpts added in v1.3.0

func WithKgoOpts(opts ...kgo.Opt) Option

WithKgoOpts appends raw kgo.Opt values to the underlying franz-go client. Escape hatch for tuning kgo features that don't have a typed wrapper here (e.g., custom partitioner, sasl, tls, etc).

func WithLogger added in v1.3.0

func WithLogger(l *slog.Logger) Option

WithLogger attaches a *slog.Logger used for lifecycle events. Defaults to slog.Default() when omitted.

func WithOtel added in v1.3.0

func WithOtel() Option

WithOtel enables OpenTelemetry instrumentation via the kotel plugin: one span per produce/consume operation plus per-broker meter metrics. Uses the global tracer/meter providers — call otel.Setup and register the provider with app.App before the kafka constructors so the providers are non-noop.

func WithProduceTimeout added in v1.3.0

func WithProduceTimeout(d time.Duration) Option

WithProduceTimeout overrides the per-call produce timeout for Producer. Default: 30s. Ignored by NewClient and NewConsumer.

type Producer added in v1.3.0

type Producer struct {
	// contains filtered or unexported fields
}

Producer is a synchronous single-record kafka producer wrapping a franz-go client. Implements lifecycle.Resource — register with app.App.

Construct via NewProducer with a ProducerConfig and Options.

func NewProducer added in v1.3.0

func NewProducer(cfg ProducerConfig, opts ...Option) (*Producer, error)

NewProducer creates a synchronous producer. The Brokers list must be non-empty. cfg.Topic is purely informational — Produce takes the topic per-call.

Default timeout is 30s; override with WithProduceTimeout.

func (*Producer) Produce added in v1.3.0

func (p *Producer) Produce(topic, key string, message []byte) error

Produce sends a single record synchronously. Blocks until the broker acks or the producer's per-call timeout expires.

func (*Producer) Shutdown added in v1.3.0

func (p *Producer) Shutdown(_ context.Context) error

Shutdown closes the underlying client, blocking until in-flight produces drain. Implements lifecycle.Resource.

Idempotent and concurrent-safe.

type ProducerConfig

type ProducerConfig struct {
	// Brokers is the seed broker list.
	Brokers []string

	// Topic is the producer's default topic. Used when callers pass an
	// empty topic to Produce; otherwise this is purely informational.
	Topic string
}

ProducerConfig describes a synchronous producer. Plain fields, no struct tags.

type Publisher added in v1.3.0

type Publisher interface {
	Produce(topic, key string, message []byte) error
}

Publisher is the minimal produce-side interface implemented by *Producer. JSONProducer accepts any Publisher so callers can swap in fakes for tests.

Named Publisher (not Producer) to avoid collision with the *Producer struct.

type Topic added in v1.3.0

type Topic string

Topic is a typed alias for a Kafka topic name. Used in Message and the processor surface to keep call sites readable.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL