kafka

package
v0.0.0-...-256bcdb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 26, 2024 License: BSD-3-Clause Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultOptions = Options{
	ConsumeConcurrent: 1000,
	Config:            sarama.NewConfig(),
	Logger:            logs.Default,
}

Functions

This section is empty.

Types

type ConsumeHandler

type ConsumeHandler func(msg *sarama.ConsumerMessage) error

type Kafka

type Kafka struct {
	// contains filtered or unexported fields
}

func New

func New(addr []string, opt ...Options) (*Kafka, error)

func (*Kafka) AsyncProduce

func (this *Kafka) AsyncProduce(topic, data string) (err error)

func (*Kafka) Close

func (this *Kafka) Close()

func (*Kafka) ConsumeGroup

func (this *Kafka) ConsumeGroup(ctx context.Context, group string, topic []string, handler ConsumeHandler)

func (*Kafka) ConsumeNewest

func (this *Kafka) ConsumeNewest(ctx context.Context, topic string, handler ConsumeHandler)

func (*Kafka) ConsumeOldest

func (this *Kafka) ConsumeOldest(ctx context.Context, topic string, handler ConsumeHandler)

func (*Kafka) ConsumePartitionNewest

func (this *Kafka) ConsumePartitionNewest(ctx context.Context, topic string, partition int32, handler ConsumeHandler)

func (*Kafka) ConsumePartitionOldest

func (this *Kafka) ConsumePartitionOldest(ctx context.Context, topic string, partition int32, handler ConsumeHandler)

func (*Kafka) Produce

func (this *Kafka) Produce(topic, data string) (partition int32, offset int64, err error)

type Options

type Options struct {
	ConsumeConcurrent int
	Config            *sarama.Config
	Logger            logs.Logger
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL