xkafka

package module
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 27, 2026 License: MIT Imports: 10 Imported by: 9

Documentation

Overview

Package xkafka provides consumers & producers to work efficiently with Kafka. `Message` is the core data structure for xkafka. `xkafka` comes with implementations that support:

  • HTTP-like handler for message processing
  • Concurrent message consumer
  • Batch message consumer with retries
  • Middleware support for consumer & producer

## Consumer The processing mode is determined by the xkafka.Concurrency option. By default, the consumer is initialized with `enable.auto.offset.store=false`. The offset is "stored" after the message is processed. The offset is "committed" based on the `auto.commit.interval.ms` options.

It is important to understand the difference between "store" and "commit". The offset is "stored" in the consumer's memory and is "committed" to Kafka. The offset is "stored" after the message is processed and the `message.Status` is Success or Skip. The stored offsets will be automatically committed, unless the `ManualCommit` option is enabled.

### Error Handling By default, xkafka.Consumer will stop processing, commit last stored offset, and exit if there is a Kafka error or if the handler returns an error.

Errors can be handled by using one or more of the following options:

Within the handler implementation Using error handling & retry middlewares through the catch all xkafka.ErrorHandler option xkafka.ErrorHandler is called for every error that is not handled by the handler or the middlewares. It is also called for errors returned by underlying Kafka client.

### Sequential Processing Sequential processing is the default mode. It is same as xkafka.Concurrency(1).

### Async Processing Async processing is enabled by setting xkafka.Concurrency to a value greater than 1. The consumer will use a pool of Go routines to process messages concurrently. Offsets are stored and committed in the order that the messages are received.

### Manual Commit By default, the consumer will automatically commit the offset based on the `auto.commit.interval.ms` option, asynchronously in the background.

The consumer can be configured to commit the offset manually by setting `xkafka.EnableManualCommit` option to true. When ManualCommit is enabled, the consumer will synchronously commit the offset after each message is processed.

NOTE: Enabling ManualCommit will add an overhead to each message. It is recommended to use ManualCommit only when necessary.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrRetryable is the error message for retryable errors.
	ErrRetryable = errors.New("xkafka: retryable error")
	// ErrRequiredOption is returned when a required option is
	// not provided.
	ErrRequiredOption = errors.New("xkafka: required option not provided")
)

Functions

func NoopErrorHandler

func NoopErrorHandler(err error) error

NoopErrorHandler is an ErrorHandler that passes the error through.

Types

type AckFunc

type AckFunc func(m *Message)

AckFunc are callback funtions triggered for every ack.

type Batch added in v0.10.0

type Batch struct {
	ID       string
	Messages []*Message
	Status   Status
	// contains filtered or unexported fields
}

Batch is a group of messages that are processed together.

func NewBatch added in v0.10.0

func NewBatch() *Batch

NewBatch creates a new Batch.

func (*Batch) AckFail added in v0.10.0

func (b *Batch) AckFail(err error) error

AckFail marks the batch as failed to process.

func (*Batch) AckSkip added in v0.10.0

func (b *Batch) AckSkip()

AckSkip marks the batch as skipped.

func (*Batch) AckSuccess added in v0.10.0

func (b *Batch) AckSuccess()

AckSuccess marks the batch as successfully processed.

func (*Batch) Err added in v0.10.0

func (b *Batch) Err() error

Err returns the error that caused the batch to fail.

func (*Batch) GroupMaxOffset added in v0.10.0

func (b *Batch) GroupMaxOffset() []kafka.TopicPartition

GroupMaxOffset returns the maximum offset for each topic-partition in the batch.

func (*Batch) MaxOffset added in v0.10.0

func (b *Batch) MaxOffset() int64

MaxOffset returns the maximum offset among the messages in the batch.

type BatchConsumer added in v0.10.0

type BatchConsumer struct {
	// contains filtered or unexported fields
}

BatchConsumer manages the consumption of messages from kafka topics and processes them in batches.

func NewBatchConsumer added in v0.10.0

func NewBatchConsumer(name string, handler BatchHandler, opts ...ConsumerOption) (*BatchConsumer, error)

NewBatchConsumer creates a new BatchConsumer instance.

func (*BatchConsumer) Run added in v0.10.0

func (c *BatchConsumer) Run(ctx context.Context) (err error)

Run starts running the BatchConsumer. The component will stop running when the context is closed. Run blocks until the context is closed or an error occurs.

func (*BatchConsumer) Use added in v0.10.0

func (c *BatchConsumer) Use(mwf ...BatchMiddlewarer)

Use appends a BatchMiddlewareFunc to the chain.

type BatchHandler added in v0.10.0

type BatchHandler interface {
	HandleBatch(ctx context.Context, b *Batch) error
}

BatchHandler defines a handler for a batch of messages.

type BatchHandlerFunc added in v0.10.0

type BatchHandlerFunc func(ctx context.Context, b *Batch) error

BatchHandlerFunc defines a function for handling a batch.

func (BatchHandlerFunc) HandleBatch added in v0.10.0

func (h BatchHandlerFunc) HandleBatch(ctx context.Context, b *Batch) error

HandleBatch implements BatchHandler interface.

type BatchMiddlewareFunc added in v0.10.0

type BatchMiddlewareFunc func(BatchHandler) BatchHandler

BatchMiddlewareFunc defines a function for batch middleware.

func (BatchMiddlewareFunc) BatchMiddleware added in v0.10.0

func (mw BatchMiddlewareFunc) BatchMiddleware(handler BatchHandler) BatchHandler

BatchMiddleware implements BatchMiddlewarer interface.

type BatchMiddlewarer added in v0.10.0

type BatchMiddlewarer interface {
	BatchMiddleware(handler BatchHandler) BatchHandler
}

BatchMiddlewarer is an interface for batch message handler middleware.

type BatchSize added in v0.10.0

type BatchSize int

BatchSize defines the maximum number of messages in a batch.

type BatchTimeout added in v0.10.0

type BatchTimeout time.Duration

BatchTimeout defines the maximum time to wait for a batch to be filled.

type Brokers

type Brokers []string

Brokers sets the kafka brokers.

type Concurrency

type Concurrency int

Concurrency defines the concurrency of the consumer.

type ConfigMap

type ConfigMap map[string]any

ConfigMap allows setting kafka configuration.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer manages the consumption of messages from kafka topics and the processing of those messages.

Example
handler := HandlerFunc(func(ctx context.Context, msg *Message) error {
	// do something with the message

	// acknowledge the message with success, skip or error
	msg.AckSuccess()

	return nil
})

ignoreError := func(err error) error {
	// ignore error
	return nil
}

consumer, err := NewConsumer("consumer-id", handler,
	Concurrency(10), // default is 1. values > 1 enable async processing
	Topics{"test"},
	Brokers{"localhost:9092"},
	// default behavior is to stop the consumer. this option allows customizing the error handling
	ErrorHandler(ignoreError),
	// custom configuration for the underlying kafka consumer
	ConfigMap{
		"auto.commit.interval.ms": 1000,
	},
	// default behavior is to commit messages automatically.
	// this option triggers manual commit after each message is processed.
	ManualCommit(true),
)
if err != nil {
	panic(err)
}

consumer.Use(
	// middleware to log messages
	MiddlewareFunc(func(next Handler) Handler {
		return HandlerFunc(func(ctx context.Context, msg *Message) error {
			// log the message
			return next.Handle(ctx, msg)
		})
	}),
)

if err := consumer.Run(context.Background()); err != nil {
	panic(err)
}

consumer.Close()

func NewConsumer

func NewConsumer(name string, handler Handler, opts ...ConsumerOption) (*Consumer, error)

NewConsumer creates a new Consumer instance.

func (*Consumer) Close

func (c *Consumer) Close()

Close closes the consumer.

func (*Consumer) GetMetadata

func (c *Consumer) GetMetadata() (*Metadata, error)

GetMetadata returns the metadata for the consumer.

func (*Consumer) Run

func (c *Consumer) Run(ctx context.Context) error

Run starts running the Consumer. The component will stop running when the context is closed. Run blocks until the context is closed or an error occurs.

func (*Consumer) Start

func (c *Consumer) Start() error

Start subscribes to the configured topics and starts consuming messages. It runs the handler for each message in a separate goroutine.

This method is non-blocking and returns immediately post subscribe. Instead, use Run if you want to block until the context is closed or an error occurs.

Errors are handled by the ErrorHandler if set, otherwise they stop the consumer and are returned.

func (*Consumer) Use

func (c *Consumer) Use(mwf ...MiddlewareFunc)

Use appends a MiddlewareFunc to the chain. Middleware can be used to intercept or otherwise modify, process or skip messages. They are executed in the order that they are applied to the Consumer.

type ConsumerOption added in v0.8.0

type ConsumerOption interface {
	// contains filtered or unexported methods
}

ConsumerOption is an interface for consumer options.

type DeliveryCallback

type DeliveryCallback AckFunc

DeliveryCallback is a callback function triggered for every published message. Works only for xkafka.Producer.

type ErrorHandler

type ErrorHandler func(err error) error

ErrorHandler is a callback function that is called when an error occurs.

type Handler

type Handler interface {
	Handle(ctx context.Context, m *Message) error
}

Handler defines a message handler.

type HandlerFunc

type HandlerFunc func(ctx context.Context, m *Message) error

HandlerFunc defines a function for handling messages.

func (HandlerFunc) Handle

func (h HandlerFunc) Handle(ctx context.Context, m *Message) error

Handle implements Handler interface on HandlerFunc.

type ManualCommit added in v0.5.0

type ManualCommit bool

ManualCommit disables the auto commit and calls the `Commit` after every message is marked as `Success` or `Skip` by the handler.

Works only for xkafka.Consumer.

WARNING: Using this option will increase the message processing time, because of the synchronous `Commit` for every message.

type Message

type Message struct {
	ID        string    `json:"id,omitempty"`
	Topic     string    `json:"topic,omitempty"`
	Partition int32     `json:"partition,omitempty"`
	Group     string    `json:"group,omitempty"`
	Key       []byte    `json:"key,omitempty"`
	Value     []byte    `json:"value,omitempty"`
	Timestamp time.Time `json:"timestamp,omitempty"`
	Status    Status    `json:"status,omitempty"`
	ErrMsg    string    `json:"err_msg,omitempty"`
	Offset    int64     `json:"offset,omitempty"`
	// contains filtered or unexported fields
}

Message holds the Kafka message data and manages the lifecycle of the message.

func (*Message) AckFail

func (m *Message) AckFail(err error) bool

AckFail marks the message as failed out and stores the error. Error overrides any existing ack status.

func (*Message) AckSkip

func (m *Message) AckSkip() bool

AckSkip marks the message as skipped. Overrides any existing ack status.

func (*Message) AckSuccess

func (m *Message) AckSuccess() bool

AckSuccess marks the message as successfully processed. Overrides any existing ack status.

func (*Message) AddCallback

func (m *Message) AddCallback(fn AckFunc)

AddCallback adds the callback func to the call stack.

func (*Message) Err

func (m *Message) Err() error

Err returns the underlying error that cause message fail. DESIGN: Intentionally called Err to avoid confusion with Error().

func (*Message) Header

func (m *Message) Header(key string) []byte

Header returns the value for the given key of the header field of the message.

func (*Message) Headers

func (m *Message) Headers() map[string][]byte

Headers returns a map to access the key and value of the header field of the message.

func (*Message) SetHeader

func (m *Message) SetHeader(key string, value []byte)

SetHeader stores the key and value of the header field of the message.

type Metadata

type Metadata = kafka.Metadata

Metadata contains broker and topic metadata for all (matching) topics

type MetadataTimeout

type MetadataTimeout time.Duration

MetadataTimeout defines the timeout for the consumer metadata request.

type MiddlewareFunc

type MiddlewareFunc func(Handler) Handler

MiddlewareFunc defines a function for middleware.

func (MiddlewareFunc) Middleware

func (mw MiddlewareFunc) Middleware(handler Handler) Handler

Middleware implements Middlewarer interface.

type Middlewarer added in v0.10.0

type Middlewarer interface {
	Middleware(handler Handler) Handler
}

Middlewarer is an interface for message handler middleware.

type PollTimeout

type PollTimeout time.Duration

PollTimeout defines the timeout for the consumer read timeout.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer manages the production of messages to kafka topics. It provides both synchronous and asynchronous publish methods and a channel to stream delivery events.

Example
ctx, cancel := context.WithCancel(context.Background())

producer, err := NewProducer(
	"producer-id",
	Brokers{"localhost:9092"},
	ConfigMap{
		"socket.keepalive.enable": true,
	},
)
if err != nil {
	panic(err)
}

producer.Use(
	// middleware to log messages
	MiddlewareFunc(func(next Handler) Handler {
		return HandlerFunc(func(ctx context.Context, msg *Message) error {
			// log the message
			return next.Handle(ctx, msg)
		})
	}),
)

go func() {
	err := producer.Run(ctx)
	if err != nil {
		panic(err)
	}
}()

msg := &Message{
	Topic: "test",
	Key:   []byte("key"),
	Value: []byte("value"),
}

if err := producer.Publish(ctx, msg); err != nil {
	panic(err)
}

// cancel the context to stop the producer
cancel()

func NewProducer

func NewProducer(name string, opts ...ProducerOption) (*Producer, error)

NewProducer creates a new Producer.

func (*Producer) AsyncPublish

func (p *Producer) AsyncPublish(ctx context.Context, msg *Message) error

AsyncPublish sends messages to the kafka topic asyncronously.

Example
ctx, cancel := context.WithCancel(context.Background())

// default callback function called after each message
// handled by the producer
callback := func(msg *Message) {
	// do something with the message
}

producer, err := NewProducer(
	"producer-id",
	Brokers{"localhost:9092"},
	ConfigMap{
		"socket.keepalive.enable": true,
	},
	DeliveryCallback(callback),
)
if err != nil {
	panic(err)
}

go func() {
	err := producer.Run(ctx)
	if err != nil {
		panic(err)
	}
}()

msg := &Message{
	Topic: "test",
	Key:   []byte("key"),
	Value: []byte("value"),
}

// each message can have its own callback function
// in addition to the default callback function
msg.AddCallback(func(m *Message) {
	// do something with the message
})

err = producer.AsyncPublish(ctx, msg)
if err != nil {
	panic(err)
}

// cancel the context to stop the producer
cancel()

func (*Producer) Close

func (p *Producer) Close()

Close waits for all messages to be delivered and closes the producer.

func (*Producer) Publish

func (p *Producer) Publish(ctx context.Context, msg *Message) error

Publish sends messages to the kafka topic synchronously. Returns error if the message cannot be enqueued or if there's a Kafka error.

func (*Producer) Run

func (p *Producer) Run(ctx context.Context) error

Run manages both starting and stopping the producer.

func (*Producer) Start

func (p *Producer) Start(ctx context.Context) error

Start starts the kafka event handling. It blocks until the context is cancelled.

func (*Producer) Use

func (p *Producer) Use(mwf ...MiddlewareFunc)

Use appends a MiddlewareFunc to the chain. Middleware can be used to intercept or otherwise modify, process or skip messages. They are executed in the order that they are applied to the Producer.

type ProducerOption added in v0.8.0

type ProducerOption interface {
	// contains filtered or unexported methods
}

ProducerOption is an interface for producer options.

type ShutdownTimeout

type ShutdownTimeout time.Duration

ShutdownTimeout defines the timeout for the consumer/producer to shutdown.

type Status

type Status int

Status is an enum for state of Message.

const (
	Unassigned Status = iota
	Success
	Fail
	Skip
)

Status enums.

func (Status) String

func (s Status) String() string

String returns a string status value.

type Topics

type Topics []string

Topics sets the kafka topics to consume.

Directories

Path Synopsis
middleware module
prometheus module
retry module
zerolog module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL