kafka

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 14, 2023 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	KLogger      = &debugLogger{log}
	KErrorLogger = &errorLogger{log}
)

Functions

func GetKafkaDialer added in v1.2.0

func GetKafkaDialer(options ...DialerOption) *kafka.Dialer

func NewGroupReader

func NewGroupReader(brokers []string, topic, groupID string, options ...ReaderOption) (r *kafka.Reader, e error)

func NewPartitionReader

func NewPartitionReader(brokers []string, topic string, partition int, options ...ReaderOption) (r *kafka.Reader, e error)

func NewPartitionWriter

func NewPartitionWriter(brokers []string, topic string, partition int, options ...WriterOption) (w *kafka.Writer, e error)

func NewWriter

func NewWriter(brokers []string, topic string, options ...WriterOption) (w *kafka.Writer, e error)

func SetLog

func SetLog(l logger.Logger)

Types

type DialerOption added in v1.2.0

type DialerOption func(*kafka.Dialer)

func DialerWithCaCertTls added in v1.2.0

func DialerWithCaCertTls(file string) DialerOption

func DialerWithUserAndPasswd added in v1.2.0

func DialerWithUserAndPasswd(user, passwd string) DialerOption

type PartitionBalancer

type PartitionBalancer struct {
	// contains filtered or unexported fields
}

partitionBalancer

func (*PartitionBalancer) Balance

func (p *PartitionBalancer) Balance(_ kafka.Message, _ ...int) (partition int)

type ReaderOption

type ReaderOption func(*kafka.ReaderConfig)

func ReaderWithCommitInterval added in v1.2.0

func ReaderWithCommitInterval(commitInterval time.Duration) ReaderOption

ReaderWithCommitInterval be careful commitInterval may cause msg repeated consume when server error exit.

func ReaderWithDialer

func ReaderWithDialer(dialer *kafka.Dialer) ReaderOption

func ReaderWithMaxBytes added in v1.2.0

func ReaderWithMaxBytes(maxBytes int) ReaderOption

func ReaderWithMinBytes added in v1.2.0

func ReaderWithMinBytes(minBytes int) ReaderOption

func ReaderWithOffset

func ReaderWithOffset(offset int64) ReaderOption

type WriterOption

type WriterOption func(*kafka.WriterConfig)

func WriterWithAsync

func WriterWithAsync() WriterOption

func WriterWithBatchBytes added in v1.2.0

func WriterWithBatchBytes(batchBytes int) WriterOption

func WriterWithBatchSize added in v1.2.0

func WriterWithBatchSize(batchSize int) WriterOption

func WriterWithBatchTimeout added in v1.2.0

func WriterWithBatchTimeout(batchTimeout time.Duration) WriterOption

func WriterWithDialer added in v1.2.0

func WriterWithDialer(dialer *kafka.Dialer) WriterOption

func WriterWithLogger added in v1.2.0

func WriterWithLogger(debugLogger, errorLogger kafka.Logger) WriterOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL