Documentation
¶
Index ¶
- func New(opts ...Option) *kafka.Writer
- type Option
- func WithAddr(addr net.Addr) Option
- func WithAllowAutoTopicCreation(allowAutoTopicCreation bool) Option
- func WithAsync(async bool) Option
- func WithBalancer(balancer kafka.Balancer) Option
- func WithBatchBytes(batchBytes int64) Option
- func WithBatchSize(batchSize int) Option
- func WithBatchTimeout(batchTimeout time.Duration) Option
- func WithCompletion(completion func(messages []kafka.Message, err error)) Option
- func WithCompression(compression kafka.Compression) Option
- func WithErrorLogger(errorLogger kafka.Logger) Option
- func WithLogger(logger kafka.Logger) Option
- func WithMaxAttempts(maxAttempts int) Option
- func WithReadTimeout(readTimeout time.Duration) Option
- func WithRequiredAcks(requiredAcks kafka.RequiredAcks) Option
- func WithTopic(topic string) Option
- func WithTransport(transport kafka.RoundTripper) Option
- func WithWriteBackoffMax(writeBackoffMax time.Duration) Option
- func WithWriteBackoffMin(writeBackoffMin time.Duration) Option
- func WithWriteTimeout(writeTimeout time.Duration) Option
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Option ¶
func WithAddr ¶
Address of the kafka cluster that this writer is configured to send messages to.
This field is required, attempting to write messages to a writer with a nil address will error.
func WithAllowAutoTopicCreation ¶
AllowAutoTopicCreation notifies writer to create topic if missing.
func WithAsync ¶
Setting this flag to true causes the WriteMessages method to never block. It also means that errors are ignored since the caller will not receive the returned value. Use this only if you don't care about guarantees of whether the messages were written to kafka.
Defaults to false.
func WithBalancer ¶
The balancer used to distribute messages across partitions.
The default is to use a round-robin distribution.
func WithBatchBytes ¶
Limit the maximum size of a request in bytes before being sent to a partition.
The default is to use a kafka default value of 1048576.
func WithBatchSize ¶
Limit on how many messages will be buffered before being sent to a partition.
The default is to use a target batch size of 100 messages.
func WithBatchTimeout ¶
Time limit on how often incomplete message batches will be flushed to kafka.
The default is to flush at least every second.
func WithCompletion ¶
An optional function called when the writer succeeds or fails the delivery of messages to a kafka partition. When writing the messages fails, the `err` parameter will be non-nil.
The messages that the Completion function is called with have their topic, partition, offset, and time set based on the Produce responses received from kafka. All messages passed to a call to the function have been written to the same partition. The keys and values of messages are referencing the original byte slices carried by messages in the calls to WriteMessages.
The function is called from goroutines started by the writer. Calls to Close will block on the Completion function calls. When the Writer is not writing asynchronously, the WriteMessages call will also block on Completion function, which is a useful guarantee if the byte slices for the message keys and values are intended to be reused after the WriteMessages call returned.
If a completion function panics, the program terminates because the panic is not recovered by the writer and bubbles up to the top of the goroutine's call stack.
func WithCompression ¶
func WithCompression(compression kafka.Compression) Option
Compression set the compression codec to be used to compress messages.
func WithErrorLogger ¶
ErrorLogger is the logger used to report errors. If nil, the writer falls back to using Logger instead.
func WithLogger ¶
If not nil, specifies a logger used to report internal changes within the writer.
func WithMaxAttempts ¶
Limit on how many attempts will be made to deliver a message.
The default is to try at most 10 times.
func WithRequiredAcks ¶
func WithRequiredAcks(requiredAcks kafka.RequiredAcks) Option
Number of acknowledges from partition replicas required before receiving a response to a produce request, the following values are supported:
RequireNone (0) fire-and-forget, do not wait for acknowledgements from the RequireOne (1) wait for the leader to acknowledge the writes RequireAll (-1) wait for the full ISR to acknowledge the writes
Defaults to RequireNone.
func WithTopic ¶
Topic is the name of the topic that the writer will produce messages to.
Setting this field or not is a mutually exclusive option. If you set Topic here, you must not set Topic for any produced Message. Otherwise, if you do not set Topic, every Message must have Topic specified.
func WithTransport ¶
func WithTransport(transport kafka.RoundTripper) Option
A transport used to send messages to kafka clusters.
If nil, DefaultTransport is used.
func WithWriteBackoffMax ¶
WriteBackoffMax optionally sets the maximum amount of time the writer waits before it attempts to write a batch of messages
Default: 1s
func WithWriteBackoffMin ¶
WriteBackoffMin optionally sets the smallest amount of time the writer waits before it attempts to write a batch of messages
Default: 100ms
func WithWriteTimeout ¶
Timeout for write operation performed by the Writer.
Defaults to 10 seconds.