Documentation
¶
Overview ¶
Package kafka wraps github.com/twmb/franz-go (kgo) with the lifecycle contract used across core: typed Configs, functional Options, Shutdown and Healthcheck methods, and an opt-in OpenTelemetry instrumentation pipeline via the kotel plugin.
The package exposes three primary types:
- Client (Resource) — base kgo.Client wrapper for callers that build their own producer/consumer logic.
- Producer (Resource) — synchronous single-record producer.
- Consumer (Runner) — consumer group with manual-commit semantics.
All three implement lifecycle.Resource (and Consumer additionally implements lifecycle.Runner), so they can be registered with app.App.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrEmptyBrokersList is returned when ClientConfig/ConsumerConfig/ // ProducerConfig is constructed without any seed brokers. ErrEmptyBrokersList = errors.New("brokers list cannot be empty") // ErrNoBrokersDiscovered is returned by Client.Healthcheck when the // underlying kgo.Client has not discovered any brokers yet (the seed // brokers were unreachable, or no metadata fetch has completed). ErrNoBrokersDiscovered = errors.New("no brokers discovered") // ErrInvalidOffset is returned by NewConsumer when ConsumerConfig.Offset // is set to a value other than "" / "newest" / "oldest". ErrInvalidOffset = errors.New("invalid offset") // ErrNilProcessor is returned by NewConsumer when the MessageProcessor // argument is nil. ErrNilProcessor = errors.New("processor cannot be nil") // ErrNilConsumerGroup is returned by NewConsumer when ConsumerConfig.Group // is empty. ErrNilConsumerGroup = errors.New("consumer group cannot be empty") // ErrEmptyTopic is returned by NewConsumer when ConsumerConfig.Topics // is empty. ErrEmptyTopic = errors.New("topics list cannot be empty") )
Sentinel errors returned by kafka constructors and methods. Stable identity so callers can errors.Is them.
Functions ¶
This section is empty.
Types ¶
type Client ¶ added in v1.3.0
type Client struct {
// contains filtered or unexported fields
}
Client is a base wrapper around *kgo.Client. Use this when you need to build your own producer or consumer logic on top of franz-go primitives. For ready-made producers and consumers, see Producer and Consumer.
Implements lifecycle.Resource and lifecycle.Healthchecker.
func (*Client) Healthcheck ¶ added in v1.3.0
Healthcheck verifies that the client has discovered at least one broker. Implements lifecycle.Healthchecker.
func (*Client) Shutdown ¶ added in v1.3.0
Shutdown closes the underlying client. Implements lifecycle.Resource.
Idempotent and concurrent-safe: the close runs exactly once.
kgo.Client.Close blocks until in-flight produces drain. The ctx is currently advisory — kgo does not honor it directly — but the signature matches lifecycle.Resource for uniform integration with app.App.
type ClientConfig ¶ added in v1.3.0
type ClientConfig struct {
// Brokers is the seed broker list. Required.
Brokers []string
// ClientID is the kgo.ClientID. Optional.
ClientID string
}
ClientConfig describes the seed brokers and client identity for a Client.
type Consumer ¶ added in v1.3.0
type Consumer struct {
// contains filtered or unexported fields
}
Consumer is a kafka consumer-group host backed by a single franz-go client. Implements lifecycle.Runner — register with app.App.
Uses manual commit (DisableAutoCommit + MarkCommitRecords + CommitMarkedOffsets) so the offset-advance decision is driven by the configured ErrorHandler rather than by franz-go's auto-commit.
Default error policy (no WithErrorHandler): log and skip failing records. See WithErrorHandler + ErrorAction for fine-grained control (DLQ, retry, fail-fast).
func NewConsumer ¶ added in v1.3.0
func NewConsumer( cfg ConsumerConfig, processor MessageProcessor, opts ...Option, ) (*Consumer, error)
NewConsumer creates a consumer-group client.
cfg.Brokers, cfg.Group, and cfg.Topics are required; cfg.Offset defaults to "newest" when empty (other valid value: "oldest").
func (*Consumer) Run ¶ added in v1.3.0
Run blocks polling for messages until ctx is cancelled (graceful) or the configured ErrorHandler returns ErrorActionStop (fatal).
Returns nil on ctx cancellation. Returns a wrapping error when ErrorActionStop is selected by the handler.
At-least-once semantics: on ErrorActionStop, records that were processed successfully earlier in the same fetch batch are NOT committed — they will be redelivered on restart. Processors must be idempotent to handle this (as they must for any Kafka consumer-group restart).
Implements lifecycle.Runner.
type ConsumerConfig ¶
type ConsumerConfig struct {
// Brokers is the seed broker list.
Brokers []string
// Group is the consumer group ID. Required.
Group string
// Offset selects the initial offset on first read of a partition:
// "newest" / "" (default) or "oldest".
Offset string
// Topics is the list of topics to subscribe to. Required.
Topics []string
}
ConsumerConfig describes a consumer group's connection parameters. Plain fields, no struct tags — consumer apps map their viper keys to fields explicitly inside their own config.NewConfig().
type ContextPublisher ¶ added in v1.3.0
ContextPublisher is a typed publisher (e.g., JSONProducer[T]) that takes a typed message and a context.
type ErrorAction ¶ added in v1.3.0
type ErrorAction int
ErrorAction selects what the Consumer should do after MessageProcessor.Process returns a non-nil error. Passed back from a caller-supplied ErrorHandler.
const ( // ErrorActionSkip logs the error and advances the consumer-group offset // past the failing record. Use for poison-pill tolerant pipelines where // DLQ is handled by the processor itself. // // This is the default when no ErrorHandler is configured, preserving the // pre-existing behavior. ErrorActionSkip ErrorAction = iota // ErrorActionRetry does NOT commit the record. The next PollFetches may // redeliver it (after a rebalance or client restart). For in-process // retry, the caller's ErrorHandler should sleep/backoff before returning. // // Note: kgo does not re-fetch uncommitted records within the same // session automatically — retry here effectively means "let the next // rebalance replay it". For tight retries the processor should loop // internally and only return Retry after exhausting its budget. ErrorActionRetry // ErrorActionStop terminates the Consumer: Run returns the original // processing error wrapped. The record is NOT committed. Use for fatal // conditions where the service should not advance past the failure // (e.g., dependency outage that must be surfaced to the operator). ErrorActionStop )
type ErrorHandler ¶ added in v1.3.0
type ErrorHandler func(ctx context.Context, msg Message, err error) ErrorAction
ErrorHandler is invoked when MessageProcessor.Process returns a non-nil error. It decides what the consumer should do with the failing record.
Implementations typically: emit to a DLQ and return ErrorActionSkip; increment a metric and return ErrorActionSkip; or on fatal errors return ErrorActionStop.
type JSONProducer ¶ added in v1.3.0
type JSONProducer[T any] struct { // contains filtered or unexported fields }
JSONProducer wraps a Publisher and JSON-encodes T messages on every call. Generic over T to give compile-time payload-type safety.
func NewJSONProducer ¶ added in v1.3.0
func NewJSONProducer[T any](topic, defaultKey string, producer Publisher) *JSONProducer[T]
NewJSONProducer wraps the given Publisher (typically *Producer) and always publishes to the configured topic.
func (*JSONProducer[T]) Produce ¶ added in v1.3.0
func (p *JSONProducer[T]) Produce(ctx context.Context, message T) error
Produce encodes message as JSON and sends it with the constructor's defaultKey.
func (*JSONProducer[T]) ProduceWithKey ¶ added in v1.3.0
func (p *JSONProducer[T]) ProduceWithKey(_ context.Context, key string, message T) error
ProduceWithKey encodes message as JSON and sends it with the given key.
type Message ¶ added in v1.3.0
Message is the per-record payload delivered to MessageProcessor.Process. The Key and Message byte slices are reused by the underlying fetch loop after Process returns — copy them if you need to retain.
type MessageProcessor ¶ added in v1.3.0
MessageProcessor is what callers implement to handle messages delivered by a Consumer. Process returns nil on success; a non-nil error is logged but does NOT halt the consumer.
type Option ¶ added in v1.3.0
type Option func(*options)
Option configures any of the kafka constructors (NewClient, NewProducer, NewConsumer). Not every option applies to every constructor — irrelevant fields are silently ignored.
func WithErrorHandler ¶ added in v1.3.0
func WithErrorHandler(h ErrorHandler) Option
WithErrorHandler configures how NewConsumer reacts to a MessageProcessor error. Without this option, the Consumer logs and skips failing records (ErrorActionSkip) — which silently advances the consumer-group offset.
Typical usages:
- Return ErrorActionSkip after pushing the payload to a DLQ topic.
- Return ErrorActionStop for unrecoverable dependency outages so the service fails loudly instead of losing messages.
- Return ErrorActionRetry to leave the record uncommitted (next rebalance redelivers it).
Ignored by NewClient and NewProducer.
func WithKgoOpts ¶ added in v1.3.0
WithKgoOpts appends raw kgo.Opt values to the underlying franz-go client. Escape hatch for tuning kgo features that don't have a typed wrapper here (e.g., custom partitioner, sasl, tls, etc).
func WithLogger ¶ added in v1.3.0
WithLogger attaches a *slog.Logger used for lifecycle events. Defaults to slog.Default() when omitted.
func WithOtel ¶ added in v1.3.0
func WithOtel() Option
WithOtel enables OpenTelemetry instrumentation via the kotel plugin: one span per produce/consume operation plus per-broker meter metrics. Uses the global tracer/meter providers — call otel.Setup and register the provider with app.App before the kafka constructors so the providers are non-noop.
func WithProduceTimeout ¶ added in v1.3.0
WithProduceTimeout overrides the per-call produce timeout for Producer. Default: 30s. Ignored by NewClient and NewConsumer.
type Producer ¶ added in v1.3.0
type Producer struct {
// contains filtered or unexported fields
}
Producer is a synchronous single-record kafka producer wrapping a franz-go client. Implements lifecycle.Resource — register with app.App.
Construct via NewProducer with a ProducerConfig and Options.
func NewProducer ¶ added in v1.3.0
func NewProducer(cfg ProducerConfig, opts ...Option) (*Producer, error)
NewProducer creates a synchronous producer. The Brokers list must be non-empty. cfg.Topic is purely informational — Produce takes the topic per-call.
Default timeout is 30s; override with WithProduceTimeout.
type ProducerConfig ¶
type ProducerConfig struct {
// Brokers is the seed broker list.
Brokers []string
// Topic is the producer's default topic. Used when callers pass an
// empty topic to Produce; otherwise this is purely informational.
Topic string
}
ProducerConfig describes a synchronous producer. Plain fields, no struct tags.