kafka

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2020 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package kafka implements Apache Kafka driver for qRPC

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	TopicPrefix string
	// contains filtered or unexported fields
}

Consumer represents the wrapper on kafka.Reader. It implements the qrpc.Consumer interface.

func NewConsumer

func NewConsumer(cfg *kafka.ReaderConfig) *Consumer

NewConsumer allocates new Consumer object

func (Consumer) Close

func (c Consumer) Close() error

Close closes the consumer connection

func (Consumer) Consume

func (c Consumer) Consume(mh qrpc.MessageHandler) error

Consume runs one consume iteration. It fetches the message from the Kafka, calls qrpc.MessageHandler to process it and commits it

func (Consumer) Subscribe

func (c Consumer) Subscribe(queues []string) error

Subscribe subscribes consumer on multiple topics (queues) and starts it It must be called before Consume

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer represents the wrapper on kafka.Writer. It implements the qrpc.Producer interface.

func NewProducer

func NewProducer(cfg *kafka.WriterConfig) *Producer

NewProducer allocates new Producer object

func (Producer) Close

func (p Producer) Close() error

Close closes the producer connection

func (Producer) Produce

func (p Producer) Produce(ctx context.Context, msg qrpc.Message) error

Produce sends the message to the topic.

func (Producer) SetQueue

func (p Producer) SetQueue(queue string)

SetQueue sets the producer topic (queue) and starts the producer process. It must be called before Produce.

type Reader

type Reader interface {
	Init(cfg *kafka.ReaderConfig)
	FetchMessage(ctx context.Context) (kafka.Message, error)
	CommitMessages(ctx context.Context, msgs ...kafka.Message) error
	Close() error
}

Reader represents abstract reader interface to wrap kafka.Reader

type Writer

type Writer interface {
	Init(cfg *kafka.WriterConfig)
	WriteMessages(ctx context.Context, msgs ...kafka.Message) error
	Close() error
}

Writer represents abstract writer interface to wrap kafka.Writer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL