kafka

package
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 9, 2022 License: Apache-2.0 Imports: 9 Imported by: 5

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewConsumer

func NewConsumer(address []string, topic string, group string, cfg *event.Config_Kafka) (event.Consumer, error)

Types

type Consumer

type Consumer struct {
	sarama.ConsumerGroup
	// contains filtered or unexported fields
}

func (*Consumer) Close

func (k *Consumer) Close() error

func (*Consumer) GetTracer

func (k *Consumer) GetTracer() trace.Tracer

func (*Consumer) Process

func (k *Consumer) Process(ctx context.Context, handler event.ConsumerHandler) error

func (*Consumer) ReportSpanAttr

func (k *Consumer) ReportSpanAttr() []attribute.KeyValue

type Producer

type Producer struct {
	sarama.SyncProducer
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(address []string, topic string, cfg *event.Config_Kafka) (*Producer, error)

func (*Producer) BatchSend

func (s *Producer) BatchSend(ctx context.Context, message []event.Event) error

func (*Producer) Close

func (s *Producer) Close() error

func (*Producer) GetTracer

func (s *Producer) GetTracer() trace.Tracer

func (*Producer) ReportSpanAttr

func (s *Producer) ReportSpanAttr() []attribute.KeyValue

func (*Producer) Send

func (s *Producer) Send(ctx context.Context, message event.Event) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL