kafka

package
v0.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2025 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// HeaderKeyKafkaTopic is the key for the Kafka topic header.
	HeaderKeyKafkaTopic = "kafka-topic"
	// HeaderKeyKafkaPartition is the key for the Kafka partition header.
	HeaderKeyKafkaPartition = "kafka-partition"
	// HeaderKeyKafkaOffset is the key for the Kafka offset header.
	HeaderKeyKafkaOffset = "kafka-offset"
	// HeaderKeyKafkaTimestamp is the key for the Kafka timestamp header.
	HeaderKeyKafkaTimestamp = "kafka-timestamp"
	// HeaderKeyKafkaTimestampType is the key for the Kafka timestamp type header.
	HeaderKeyKafkaTimestampType = "kafka-timestamp-type"
	// HeaderKeyCompressionType is the key for the compression type header.
	HeaderKeyCompressionType = "compression-type"
)

Variables

This section is empty.

Functions

func NewClient

func NewClient(config ClientConfig, opts ...kgo.Opt) (*kgo.Client, error)

NewClient creates a new Kafka client using kgo package.

func NewReaderClient

func NewReaderClient(config ClientConfig, opts ...ReaderClientOption) (*kgo.Client, error)

NewReaderClient creates a new Kafka reader client using kgo package.

func NewTxClient

func NewTxClient(config ClientConfig, opts ...TxClientOption) (*kgo.Client, error)

NewTxClient creates a new Kafka transactional producer client using kgo package.

Types

type AsyncWriter

type AsyncWriter struct {
	// contains filtered or unexported fields
}

AsyncWriter is a Kafka stream.Writer that writes messages asynchronously.

Compared with the SyncWriter option, this writer is more efficient for high-throughput scenarios where the application does not need to wait for the result of the write operation.

func NewAsyncWriter

func NewAsyncWriter(client *kgo.Client, opts ...AsyncWriterOption) AsyncWriter

NewAsyncWriter creates a new instance of AsyncWriter.

func (AsyncWriter) Close

func (s AsyncWriter) Close() error

Close closes the underlying Kafka client.

func (AsyncWriter) Write

func (s AsyncWriter) Write(ctx context.Context, name string, message stream.Message) error

func (AsyncWriter) WriteBatch

func (s AsyncWriter) WriteBatch(ctx context.Context, name string, messages []stream.Message) (int, error)

type AsyncWriterHandler

type AsyncWriterHandler func(record *kgo.Record, err error)

AsyncWriterHandler is a function that intercepts the result of a produced message.

func NewAsyncWriterLogHandler

func NewAsyncWriterLogHandler(logger *slog.Logger) AsyncWriterHandler

NewAsyncWriterLogHandler creates a new instance of AsyncWriterHandler that logs the result of a produced message.

type AsyncWriterOption

type AsyncWriterOption func(*asyncStreamWriterOptions)

AsyncWriterOption is a function that modifies the behavior of an AsyncWriter.

func WithAsyncWriterHandler

func WithAsyncWriterHandler(handler AsyncWriterHandler) AsyncWriterOption

WithAsyncWriterHandler sets the handler for an AsyncWriter.

type ClientConfig

type ClientConfig struct {
	SeedBrokers []string `env:"KAFKA_SEED_BROKERS" envSeparator:"," envDefault:"localhost:9092"`
	ClientID    string   `env:"KAFKA_CLIENT_ID"`
}

ClientConfig is a structure used by factory routines generating Kafka clients.

type ReaderClientOption

type ReaderClientOption func(*readerClientOptions)

ReaderClientOption is a functional option for configuring Kafka clients.

func WithClientConsumerGroup

func WithClientConsumerGroup(name string) ReaderClientOption

WithClientConsumerGroup sets the consumer group for the Kafka client.

func WithClientReadCommittedOnly

func WithClientReadCommittedOnly() ReaderClientOption

WithClientReadCommittedOnly sets the Kafka client to read committed messages only.

Enable this option if the topic to be read is configured with transactional semantics.

func WithReaderClientBaseOpts

func WithReaderClientBaseOpts(opts ...kgo.Opt) ReaderClientOption

WithReaderClientBaseOpts sets the base options for the Kafka client.

type ReaderManager

type ReaderManager struct {
	// contains filtered or unexported fields
}

ReaderManager is a Kafka stream.ReaderManager that reads messages from Kafka topics.

func NewReaderManager

func NewReaderManager(client *kgo.Client, opts ...ReaderManagerOption) *ReaderManager

NewReaderManager creates a new instance of ReaderManager.

func (*ReaderManager) Close

func (r *ReaderManager) Close(ctx context.Context) error

func (*ReaderManager) Register

func (r *ReaderManager) Register(topic string, handler stream.HandlerFunc)

func (*ReaderManager) Start

func (r *ReaderManager) Start(ctx context.Context) error

type ReaderManagerOption

type ReaderManagerOption func(*readerManagerOptions)

func WithReaderCommitAll

func WithReaderCommitAll() ReaderManagerOption

func WithReaderHandlerTimeout

func WithReaderHandlerTimeout(timeout time.Duration) ReaderManagerOption

func WithReaderLogger

func WithReaderLogger(logger *slog.Logger) ReaderManagerOption

func WithReaderPollBatchSize

func WithReaderPollBatchSize(size int) ReaderManagerOption

func WithReaderPollInterval

func WithReaderPollInterval(interval time.Duration) ReaderManagerOption

func WithReaderWorkerPoolSize

func WithReaderWorkerPoolSize(size int) ReaderManagerOption

type SyncWriter

type SyncWriter struct {
	// contains filtered or unexported fields
}

SyncWriter is a Kafka stream writer that writes messages synchronously.

Compared with the AsyncWriter option, this writer is more suitable for scenarios where the application needs to wait for the result of the write operation.

func NewSyncWriter

func NewSyncWriter(client *kgo.Client) SyncWriter

NewSyncWriter creates a new instance of SyncWriter.

func (SyncWriter) Close

func (s SyncWriter) Close() error

Close closes the underlying Kafka client.

func (SyncWriter) Write

func (s SyncWriter) Write(ctx context.Context, name string, message stream.Message) error

func (SyncWriter) WriteBatch

func (s SyncWriter) WriteBatch(ctx context.Context, name string, messages []stream.Message) (int, error)

type TransactionalWriter

type TransactionalWriter struct {
	// contains filtered or unexported fields
}

func NewTransactionalWriter

func NewTransactionalWriter(client *kgo.Client) TransactionalWriter

NewTransactionalWriter creates a new instance of TransactionalWriter.

func (TransactionalWriter) Close

func (s TransactionalWriter) Close() error

Close closes the underlying Kafka client.

func (TransactionalWriter) Write

func (s TransactionalWriter) Write(ctx context.Context, name string, message stream.Message) error

func (TransactionalWriter) WriteBatch

func (s TransactionalWriter) WriteBatch(ctx context.Context, name string, messages []stream.Message) (int, error)

type TxClientOption

type TxClientOption func(*txClientOptions)

TxClientOption is a functional option for configuring Kafka clients.

func WithClientTxID

func WithClientTxID(id string) TxClientOption

WithClientTxID sets the transactional ID for the Kafka client.

func WithTxClientBaseOpts

func WithTxClientBaseOpts(opts ...kgo.Opt) TxClientOption

WithTxClientBaseOpts sets the base options for the Kafka client.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL