v3.0.3+incompatible Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jun 24, 2019 License: Apache-2.0 Imports: 5 Imported by: 0




This section is empty.


This section is empty.


This section is empty.


type Config

type Config struct {
	KafkaAddr []string
	// the CommitTs of binlog return by reader will bigger than the config CommitTs
	CommitTS int64
	Offset   int64 // start at kafka offset
	// if Topic is empty, use the default name in drainer <ClusterID>_obinlog
	Topic     string
	ClusterID string

Config for Reader

type KafkaSeeker

type KafkaSeeker struct {
	// contains filtered or unexported fields

KafkaSeeker seeks offset in kafka topics by given condition

func NewKafkaSeeker

func NewKafkaSeeker(addr []string, config *sarama.Config) (*KafkaSeeker, error)

NewKafkaSeeker creates an instance of KafkaSeeker

func (*KafkaSeeker) Close

func (ks *KafkaSeeker) Close()

Close releases resources of KafkaSeeker

func (*KafkaSeeker) Seek

func (ks *KafkaSeeker) Seek(topic string, ts int64, partitions []int32) (offsets []int64, err error)

Seek seeks the first offset which binlog CommitTs bigger than ts

type Message

type Message struct {
	Binlog *pb.Binlog
	Offset int64 // kafka offset

Message read from reader

type Reader

type Reader struct {
	// contains filtered or unexported fields

Reader to read binlog from kafka

func NewReader

func NewReader(cfg *Config) (r *Reader, err error)

NewReader creates an instance of Reader

func (*Reader) Close

func (r *Reader) Close()

Close shuts down the reader

func (*Reader) Messages

func (r *Reader) Messages() (msgs <-chan *Message)

Messages returns a chan that contains unread buffered message

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL