Documentation
¶
Index ¶
- Constants
- func NewClient(config ClientConfig, opts ...kgo.Opt) (*kgo.Client, error)
- func NewReaderClient(config ClientConfig, opts ...ReaderClientOption) (*kgo.Client, error)
- func NewTxClient(config ClientConfig, opts ...TxClientOption) (*kgo.Client, error)
- type AsyncWriter
- type AsyncWriterHandler
- type AsyncWriterOption
- type ClientConfig
- type ReaderClientOption
- type ReaderManager
- type ReaderManagerOption
- func WithReaderCommitAll() ReaderManagerOption
- func WithReaderHandlerTimeout(timeout time.Duration) ReaderManagerOption
- func WithReaderLogger(logger *slog.Logger) ReaderManagerOption
- func WithReaderPollBatchSize(size int) ReaderManagerOption
- func WithReaderPollInterval(interval time.Duration) ReaderManagerOption
- func WithReaderWorkerPoolSize(size int) ReaderManagerOption
- type SyncWriter
- type TransactionalWriter
- type TxClientOption
Constants ¶
const ( // HeaderKeyKafkaTopic is the key for the Kafka topic header. HeaderKeyKafkaTopic = "kafka-topic" // HeaderKeyKafkaPartition is the key for the Kafka partition header. HeaderKeyKafkaPartition = "kafka-partition" // HeaderKeyKafkaOffset is the key for the Kafka offset header. HeaderKeyKafkaOffset = "kafka-offset" // HeaderKeyKafkaTimestamp is the key for the Kafka timestamp header. HeaderKeyKafkaTimestamp = "kafka-timestamp" // HeaderKeyKafkaTimestampType is the key for the Kafka timestamp type header. HeaderKeyKafkaTimestampType = "kafka-timestamp-type" // HeaderKeyCompressionType is the key for the compression type header. HeaderKeyCompressionType = "compression-type" )
Variables ¶
This section is empty.
Functions ¶
func NewReaderClient ¶
func NewReaderClient(config ClientConfig, opts ...ReaderClientOption) (*kgo.Client, error)
NewReaderClient creates a new Kafka reader client using kgo package.
func NewTxClient ¶
func NewTxClient(config ClientConfig, opts ...TxClientOption) (*kgo.Client, error)
NewTxClient creates a new Kafka transactional producer client using kgo package.
Types ¶
type AsyncWriter ¶
type AsyncWriter struct {
// contains filtered or unexported fields
}
AsyncWriter is a Kafka stream.Writer that writes messages asynchronously.
Compared with the SyncWriter option, this writer is more efficient for high-throughput scenarios where the application does not need to wait for the result of the write operation.
func NewAsyncWriter ¶
func NewAsyncWriter(client *kgo.Client, opts ...AsyncWriterOption) AsyncWriter
NewAsyncWriter creates a new instance of AsyncWriter.
func (AsyncWriter) Close ¶
func (s AsyncWriter) Close() error
Close closes the underlying Kafka client.
func (AsyncWriter) WriteBatch ¶
type AsyncWriterHandler ¶
AsyncWriterHandler is a function that intercepts the result of a produced message.
func NewAsyncWriterLogHandler ¶
func NewAsyncWriterLogHandler(logger *slog.Logger) AsyncWriterHandler
NewAsyncWriterLogHandler creates a new instance of AsyncWriterHandler that logs the result of a produced message.
type AsyncWriterOption ¶
type AsyncWriterOption func(*asyncStreamWriterOptions)
AsyncWriterOption is a function that modifies the behavior of an AsyncWriter.
func WithAsyncWriterHandler ¶
func WithAsyncWriterHandler(handler AsyncWriterHandler) AsyncWriterOption
WithAsyncWriterHandler sets the handler for an AsyncWriter.
type ClientConfig ¶
type ClientConfig struct { SeedBrokers []string `env:"KAFKA_SEED_BROKERS" envSeparator:"," envDefault:"localhost:9092"` ClientID string `env:"KAFKA_CLIENT_ID"` }
ClientConfig is a structure used by factory routines generating Kafka clients.
type ReaderClientOption ¶
type ReaderClientOption func(*readerClientOptions)
ReaderClientOption is a functional option for configuring Kafka clients.
func WithClientConsumerGroup ¶
func WithClientConsumerGroup(name string) ReaderClientOption
WithClientConsumerGroup sets the consumer group for the Kafka client.
func WithClientReadCommittedOnly ¶
func WithClientReadCommittedOnly() ReaderClientOption
WithClientReadCommittedOnly sets the Kafka client to read committed messages only.
Enable this option if the topic to be read is configured with transactional semantics.
func WithReaderClientBaseOpts ¶
func WithReaderClientBaseOpts(opts ...kgo.Opt) ReaderClientOption
WithReaderClientBaseOpts sets the base options for the Kafka client.
type ReaderManager ¶
type ReaderManager struct {
// contains filtered or unexported fields
}
ReaderManager is a Kafka stream.ReaderManager that reads messages from Kafka topics.
func NewReaderManager ¶
func NewReaderManager(client *kgo.Client, opts ...ReaderManagerOption) *ReaderManager
NewReaderManager creates a new instance of ReaderManager.
func (*ReaderManager) Register ¶
func (r *ReaderManager) Register(topic string, handler stream.HandlerFunc)
type ReaderManagerOption ¶
type ReaderManagerOption func(*readerManagerOptions)
func WithReaderCommitAll ¶
func WithReaderCommitAll() ReaderManagerOption
func WithReaderHandlerTimeout ¶
func WithReaderHandlerTimeout(timeout time.Duration) ReaderManagerOption
func WithReaderLogger ¶
func WithReaderLogger(logger *slog.Logger) ReaderManagerOption
func WithReaderPollBatchSize ¶
func WithReaderPollBatchSize(size int) ReaderManagerOption
func WithReaderPollInterval ¶
func WithReaderPollInterval(interval time.Duration) ReaderManagerOption
func WithReaderWorkerPoolSize ¶
func WithReaderWorkerPoolSize(size int) ReaderManagerOption
type SyncWriter ¶
type SyncWriter struct {
// contains filtered or unexported fields
}
SyncWriter is a Kafka stream writer that writes messages synchronously.
Compared with the AsyncWriter option, this writer is more suitable for scenarios where the application needs to wait for the result of the write operation.
func NewSyncWriter ¶
func NewSyncWriter(client *kgo.Client) SyncWriter
NewSyncWriter creates a new instance of SyncWriter.
func (SyncWriter) Close ¶
func (s SyncWriter) Close() error
Close closes the underlying Kafka client.
func (SyncWriter) WriteBatch ¶
type TransactionalWriter ¶
type TransactionalWriter struct {
// contains filtered or unexported fields
}
func NewTransactionalWriter ¶
func NewTransactionalWriter(client *kgo.Client) TransactionalWriter
NewTransactionalWriter creates a new instance of TransactionalWriter.
func (TransactionalWriter) Close ¶
func (s TransactionalWriter) Close() error
Close closes the underlying Kafka client.
func (TransactionalWriter) WriteBatch ¶
type TxClientOption ¶
type TxClientOption func(*txClientOptions)
TxClientOption is a functional option for configuring Kafka clients.
func WithClientTxID ¶
func WithClientTxID(id string) TxClientOption
WithClientTxID sets the transactional ID for the Kafka client.
func WithTxClientBaseOpts ¶
func WithTxClientBaseOpts(opts ...kgo.Opt) TxClientOption
WithTxClientBaseOpts sets the base options for the Kafka client.