docsyncer

package
v0.0.0-...-d253ebd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2021 License: GPL-3.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MAX_BUFFER_BYTE_SIZE = 12 * 1024 * 1024
	SpliterReader        = 4
)

Variables

View Source
var (
	GlobalCollExecutorId int32 = -1
	GlobalDocExecutorId  int32 = -1
)

Functions

func Checkpoint

func Checkpoint(ckptMap map[string]utils.TimestampNode) error

func GenerateCollExecutorId

func GenerateCollExecutorId() int

func GenerateDocExecutorId

func GenerateDocExecutorId() int

func GetAllNamespace

func GetAllNamespace(sources []*utils.MongoSource) (map[utils.NS]struct{}, map[string][]string, error)

*

  • return all namespace. return:
  • @map[utils.NS]struct{}: namespace set where key is the namespace while value is useless, e.g., "a.b"->nil, "a.c"->nil
  • @map[string][]string: db->collection map. e.g., "a"->[]string{"b", "c"}
  • @error: error info

func GetDbNamespace

func GetDbNamespace(url string) ([]utils.NS, map[string][]string, error)

*

  • return db namespace. return:
  • @[]utils.NS: namespace list, e.g., []{"a.b", "a.c"}
  • @map[string][]string: db->collection map. e.g., "a"->[]string{"b", "c"}
  • @error: error info

func IsShardingToSharding

func IsShardingToSharding(fromIsSharding bool, toConn *utils.MongoConn) bool

func StartDropDestCollection

func StartDropDestCollection(nsSet map[utils.NS]struct{}, toConn *utils.MongoConn,
	nsTrans *transform.NamespaceTransform) error

func StartIndexSync

func StartIndexSync(indexMap map[utils.NS][]mgo.Index, toUrl string,
	nsTrans *transform.NamespaceTransform, background bool) (syncError error)

func StartNamespaceSpecSyncForSharding

func StartNamespaceSpecSyncForSharding(csUrl string, toConn *utils.MongoConn,
	nsTrans *transform.NamespaceTransform) error

Types

type CollectionExecutor

type CollectionExecutor struct {
	// contains filtered or unexported fields
}

func NewCollectionExecutor

func NewCollectionExecutor(id int, mongoUrl string, ns utils.NS, syncer *DBSyncer) *CollectionExecutor

func (*CollectionExecutor) Start

func (colExecutor *CollectionExecutor) Start() error

func (*CollectionExecutor) Sync

func (colExecutor *CollectionExecutor) Sync(docs []*bson.Raw)

func (*CollectionExecutor) Wait

func (colExecutor *CollectionExecutor) Wait() error

type CollectionMetric

type CollectionMetric struct {
	CollectionStatus Status
	TotalCount       uint64
	FinishCount      uint64
}

func NewCollectionMetric

func NewCollectionMetric() *CollectionMetric

func (*CollectionMetric) String

func (cm *CollectionMetric) String() string

type DBSyncer

type DBSyncer struct {

	// source mongodb url
	FromMongoUrl string

	// destination mongodb url
	ToMongoUrl string

	// source is sharding?
	FromIsSharding bool
	// contains filtered or unexported fields
}

********************************************************************** 1 shard -> 1 DBSyncer

func NewDBSyncer

func NewDBSyncer(
	id int,
	fromMongoUrl string,
	fromReplset string,
	toMongoUrl string,
	nsTrans *transform.NamespaceTransform,
	orphanFilter *filter.OrphanFilter,
	qos *utils.Qos,
	fromIsSharding bool) *DBSyncer

func (*DBSyncer) Close

func (syncer *DBSyncer) Close()

func (*DBSyncer) GetIndexMap

func (syncer *DBSyncer) GetIndexMap() map[utils.NS][]mgo.Index

@deprecated

func (*DBSyncer) Init

func (syncer *DBSyncer) Init()

func (*DBSyncer) RestAPI

func (syncer *DBSyncer) RestAPI()

********************************************************************** restful api

func (*DBSyncer) Start

func (syncer *DBSyncer) Start() (syncError error)

func (*DBSyncer) String

func (syncer *DBSyncer) String() string

type DocExecutor

type DocExecutor struct {
	// contains filtered or unexported fields
}

func NewDocExecutor

func NewDocExecutor(id int, colExecutor *CollectionExecutor, session *mgo.Session, syncer *DBSyncer) *DocExecutor

func (*DocExecutor) String

func (exec *DocExecutor) String() string

type DocumentReader

type DocumentReader struct {
	// contains filtered or unexported fields
}

*********************************************** DocumentReader: the reader of single piece

func NewDocumentReader

func NewDocumentReader(src string, ns utils.NS, start, end interface{}) *DocumentReader

NewDocumentReader creates reader with mongodb url

func (*DocumentReader) Close

func (reader *DocumentReader) Close()

func (*DocumentReader) CloseMgo

func (reader *DocumentReader) CloseMgo()

deprecate, used for mgo

func (*DocumentReader) NextDoc

func (reader *DocumentReader) NextDoc() (doc *bson.Raw, err error)

NextDoc returns an document by raw bytes which is []byte

func (*DocumentReader) NextDocMgo

func (reader *DocumentReader) NextDocMgo() (doc *bson.Raw, err error)

deprecate, used for mgo

func (*DocumentReader) String

func (reader *DocumentReader) String() string

type DocumentSplitter

type DocumentSplitter struct {
	// contains filtered or unexported fields
}

*********************************************** splitter: pre-split the collection into several pieces

func NewDocumentSplitter

func NewDocumentSplitter(src string, ns utils.NS) *DocumentSplitter

func (*DocumentSplitter) Close

func (ds *DocumentSplitter) Close()

func (*DocumentSplitter) GetIndexes

func (ds *DocumentSplitter) GetIndexes() ([]mgo.Index, error)

@deprecated

func (*DocumentSplitter) Run

func (ds *DocumentSplitter) Run() error

TODO, need add retry

func (*DocumentSplitter) String

func (ds *DocumentSplitter) String() string

type Status

type Status string
const (
	StatusWaitStart  Status = "wait start"
	StatusProcessing Status = "in processing"
	StatusFinish     Status = "finish"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL