sourceReader

package
v2.0.0-...-acbaf60 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 4, 2023 License: GPL-3.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	QueryTs    = "ts"
	QueryGid   = "g"
	QueryOpGT  = "$gt"
	QueryOpGTE = "$gte"
)
View Source
const (
	ErrInvalidStartPosition = "resume point may no longer be in the oplog."
)

Variables

View Source
var (
	BatchSize   = conf.Options.IncrSyncReaderFetchBatchSize
	ChannelSize = BatchSize * 10
)
View Source
var CollectionCappedError = errors.New("collection capped error")
View Source
var TimeoutError = errors.New("read next log timeout, It shouldn't be happen")

TimeoutError. mongodb query executed timeout

Functions

This section is empty.

Types

type EventReader

type EventReader struct {
	// contains filtered or unexported fields
}

func NewEventReader

func NewEventReader(src string, replset string) *EventReader

NewEventReader creates reader with mongodb url

func (*EventReader) EnsureNetwork

func (er *EventReader) EnsureNetwork() error

func (*EventReader) FetchNewestTimestamp

func (er *EventReader) FetchNewestTimestamp() (interface{}, error)

func (*EventReader) Name

func (er *EventReader) Name() string

func (*EventReader) Next

func (er *EventReader) Next() ([]byte, error)

Next returns an oplog by raw bytes which is []byte

func (*EventReader) SetQueryTimestampOnEmpty

func (er *EventReader) SetQueryTimestampOnEmpty(ts interface{})

SetQueryTimestampOnEmpty set internal timestamp if not exist in this or. initial stage most of the time

func (*EventReader) StartFetcher

func (er *EventReader) StartFetcher()

start fetcher if not exist

func (*EventReader) String

func (er *EventReader) String() string

func (*EventReader) UpdateQueryTimestamp

func (er *EventReader) UpdateQueryTimestamp(ts int64)

type GidOplogReader

type GidOplogReader struct {
	OplogReader
}

GidOplogReader. query along with gid

func NewGidOplogReader

func NewGidOplogReader(src string) *GidOplogReader

func (*GidOplogReader) SetQueryGid

func (reader *GidOplogReader) SetQueryGid(gid string)

type OplogReader

type OplogReader struct {
	// contains filtered or unexported fields
}

OplogReader represents stream reader from mongodb that specified by an url. And with query options. user can iterate oplogs.

func NewOplogReader

func NewOplogReader(src string, replset string) *OplogReader

NewOplogReader creates reader with mongodb url

func (*OplogReader) EnsureNetwork

func (or *OplogReader) EnsureNetwork() (err error)

ensureNetwork establish the mongodb connection at first if current connection is not ready or disconnected

func (*OplogReader) FetchNewestTimestamp

func (or *OplogReader) FetchNewestTimestamp() (interface{}, error)

func (*OplogReader) Name

func (or *OplogReader) Name() string

func (*OplogReader) Next

func (or *OplogReader) Next() ([]byte, error)

Next returns an oplog by raw bytes which is []byte

func (*OplogReader) NextOplog

func (or *OplogReader) NextOplog() (log *oplog.GenericOplog, err error)

NextOplog returns an oplog by oplog.GenericOplog struct

func (*OplogReader) SetQueryTimestampOnEmpty

func (or *OplogReader) SetQueryTimestampOnEmpty(ts interface{})

SetQueryTimestampOnEmpty set internal timestamp if not exist in this or. initial stage most of the time

func (*OplogReader) StartFetcher

func (or *OplogReader) StartFetcher()

start fetcher if not exist

func (*OplogReader) String

func (or *OplogReader) String() string

func (*OplogReader) UpdateQueryTimestamp

func (or *OplogReader) UpdateQueryTimestamp(ts int64)

type Reader

type Reader interface {
	Name() string                               // reader name
	StartFetcher()                              // start fetcher
	SetQueryTimestampOnEmpty(interface{})       // set query timestamp when first start
	UpdateQueryTimestamp(int64)                 // update query timestamp
	Next() ([]byte, error)                      // fetch next oplog/event
	EnsureNetwork() error                       // ensure network
	FetchNewestTimestamp() (interface{}, error) // only used in EventReader that fetch PBRT
}

func CreateReader

func CreateReader(fetchMethod string, src string, replset string) (Reader, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL