writer

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2026 License: MIT Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func New

func New(opts ...Option) *kafka.Writer

Types

type Option

type Option func(*kafka.Writer)

func WithAddr

func WithAddr(addr net.Addr) Option

Address of the kafka cluster that this writer is configured to send messages to.

This field is required, attempting to write messages to a writer with a nil address will error.

func WithAllowAutoTopicCreation

func WithAllowAutoTopicCreation(allowAutoTopicCreation bool) Option

AllowAutoTopicCreation notifies writer to create topic if missing.

func WithAsync

func WithAsync(async bool) Option

Setting this flag to true causes the WriteMessages method to never block. It also means that errors are ignored since the caller will not receive the returned value. Use this only if you don't care about guarantees of whether the messages were written to kafka.

Defaults to false.

func WithBalancer

func WithBalancer(balancer kafka.Balancer) Option

The balancer used to distribute messages across partitions.

The default is to use a round-robin distribution.

func WithBatchBytes

func WithBatchBytes(batchBytes int64) Option

Limit the maximum size of a request in bytes before being sent to a partition.

The default is to use a kafka default value of 1048576.

func WithBatchSize

func WithBatchSize(batchSize int) Option

Limit on how many messages will be buffered before being sent to a partition.

The default is to use a target batch size of 100 messages.

func WithBatchTimeout

func WithBatchTimeout(batchTimeout time.Duration) Option

Time limit on how often incomplete message batches will be flushed to kafka.

The default is to flush at least every second.

func WithCompletion

func WithCompletion(completion func(messages []kafka.Message, err error)) Option

An optional function called when the writer succeeds or fails the delivery of messages to a kafka partition. When writing the messages fails, the `err` parameter will be non-nil.

The messages that the Completion function is called with have their topic, partition, offset, and time set based on the Produce responses received from kafka. All messages passed to a call to the function have been written to the same partition. The keys and values of messages are referencing the original byte slices carried by messages in the calls to WriteMessages.

The function is called from goroutines started by the writer. Calls to Close will block on the Completion function calls. When the Writer is not writing asynchronously, the WriteMessages call will also block on Completion function, which is a useful guarantee if the byte slices for the message keys and values are intended to be reused after the WriteMessages call returned.

If a completion function panics, the program terminates because the panic is not recovered by the writer and bubbles up to the top of the goroutine's call stack.

func WithCompression

func WithCompression(compression kafka.Compression) Option

Compression set the compression codec to be used to compress messages.

func WithErrorLogger

func WithErrorLogger(errorLogger kafka.Logger) Option

ErrorLogger is the logger used to report errors. If nil, the writer falls back to using Logger instead.

func WithLogger

func WithLogger(logger kafka.Logger) Option

If not nil, specifies a logger used to report internal changes within the writer.

func WithMaxAttempts

func WithMaxAttempts(maxAttempts int) Option

Limit on how many attempts will be made to deliver a message.

The default is to try at most 10 times.

func WithReadTimeout

func WithReadTimeout(readTimeout time.Duration) Option

Timeout for read operations performed by the Writer.

Defaults to 10 seconds.

func WithRequiredAcks

func WithRequiredAcks(requiredAcks kafka.RequiredAcks) Option

Number of acknowledges from partition replicas required before receiving a response to a produce request, the following values are supported:

RequireNone (0)  fire-and-forget, do not wait for acknowledgements from the
RequireOne  (1)  wait for the leader to acknowledge the writes
RequireAll  (-1) wait for the full ISR to acknowledge the writes

Defaults to RequireNone.

func WithTopic

func WithTopic(topic string) Option

Topic is the name of the topic that the writer will produce messages to.

Setting this field or not is a mutually exclusive option. If you set Topic here, you must not set Topic for any produced Message. Otherwise, if you do not set Topic, every Message must have Topic specified.

func WithTransport

func WithTransport(transport kafka.RoundTripper) Option

A transport used to send messages to kafka clusters.

If nil, DefaultTransport is used.

func WithWriteBackoffMax

func WithWriteBackoffMax(writeBackoffMax time.Duration) Option

WriteBackoffMax optionally sets the maximum amount of time the writer waits before it attempts to write a batch of messages

Default: 1s

func WithWriteBackoffMin

func WithWriteBackoffMin(writeBackoffMin time.Duration) Option

WriteBackoffMin optionally sets the smallest amount of time the writer waits before it attempts to write a batch of messages

Default: 100ms

func WithWriteTimeout

func WithWriteTimeout(writeTimeout time.Duration) Option

Timeout for write operation performed by the Writer.

Defaults to 10 seconds.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL