Documentation ¶
Index ¶
- Constants
- func DDLSupportForSharding() bool
- func TransformDDL(replset string, log *oplog.PartialLog, shardColSpec *utils.ShardCollectionSpec, ...) []*oplog.PartialLog
- type Batcher
- type CheckpointManager
- func (manager *CheckpointManager) Flush(tablePrefix string) error
- func (manager *CheckpointManager) FlushAll() error
- func (manager *CheckpointManager) Get(replset string) bson.MongoTimestamp
- func (manager *CheckpointManager) GetTableList(tablePrefix string) []string
- func (manager *CheckpointManager) Load(tablePrefix string) error
- func (manager *CheckpointManager) LoadAll() error
- type DDLKey
- type DDLManager
- func (manager *DDLManager) BlockDDL(replset string, log *oplog.PartialLog) bool
- func (manager *DDLManager) Flush(tablePrefix string) error
- func (manager *DDLManager) GetTableList(tablePrefix string) []string
- func (manager *DDLManager) Load(tablePrefix string) error
- func (manager *DDLManager) UnBlockDDL(replset string, log *oplog.PartialLog)
- type DDLValue
- type MCIItem
- type Module
- type MoveChunkKey
- type MoveChunkManager
- func (manager *MoveChunkManager) BarrierOplog(replset string, partialLog *oplog.PartialLog) (bool, bool, interface{})
- func (manager *MoveChunkManager) Flush(tablePrefix string) error
- func (manager *MoveChunkManager) GetTableList(tablePrefix string) []string
- func (manager *MoveChunkManager) Load(tablePrefix string) error
- type MoveChunkValue
- type OplogHandler
- type OplogSyncer
- type Persist
- type ReplicationCoordinator
- type SyncerMoveChunk
- type TransferEventListener
- type Worker
- type WriteController
Constants ¶
const ( StorageTypeAPI = "api" StorageTypeDB = "database" CheckpointMoveChunkIntervalMS = 5000 )
const ( CheckpointKeyNs = "ns" CheckpointKeyObject = "obj" CheckpointBlocklog = "blockLog" CheckpointDBMap = "dbMap" DDLCheckInterval = 1 // s DDLUnResponseThreshold = 30 // s )
const ( MoveChunkBarrierKey = "barrierKey" MoveChunkKeyName = "key" MoveChunkInsertMap = "insertMap" MoveChunkDeleteItem = "deleteItem" MoveChunkBufferSize = 1000 MoveChunkUnResponseThreshold = 30 // s )
const ( SYNCMODE_ALL = "all" SYNCMODE_DOCUMENT = "document" SYNCMODE_OPLOG = "oplog" )
const ( // bson deserialize workload is CPU-intensive task PipelineQueueMaxNr = 4 PipelineQueueMinNr = 1 PipelineQueueLen = 64 WaitBarrierAckLogTimes = 10 DurationTime = 6000 // unit: ms. DDLCheckpointGap = 5 // unit: seconds. FilterCheckpointGap = 180 // unit: seconds. no checkpoint update, flush checkpoint mandatory ShardingWorkerId = 0 )
const ( MaxUnAckListLength = 128 * 256 DDLCheckpointInterval = 300 // unit: ms )
const (
WaitAckIntervalMS = 1000
)
Variables ¶
This section is empty.
Functions ¶
func DDLSupportForSharding ¶
func DDLSupportForSharding() bool
func TransformDDL ¶
func TransformDDL(replset string, log *oplog.PartialLog, shardColSpec *utils.ShardCollectionSpec, toIsSharding bool) []*oplog.PartialLog
Types ¶
type Batcher ¶
type Batcher struct {
// contains filtered or unexported fields
}
* as we mentioned in syncer.go, Batcher is used to batch oplog before sending in order to * improve performance.
func NewBatcher ¶
func NewBatcher(syncer *OplogSyncer, filterList filter.OplogFilterChain, handler OplogHandler, workerGroup []*Worker) *Batcher
func (*Batcher) Next ¶
func (batcher *Batcher) Next() []*oplog.GenericOplog
* return batched oplogs and barrier flag * return the last oplog, if the current batch is empty(first oplog in this batch is ddl), * just return the last oplog in the previous batch. * if just start, this is nil.
func (*Batcher) WaitAllAck ¶
func (batcher *Batcher) WaitAllAck()
type CheckpointManager ¶
type CheckpointManager struct { FlushChan chan bool // contains filtered or unexported fields }
func NewCheckpointManager ¶
func NewCheckpointManager(startPosition int64) *CheckpointManager
func (*CheckpointManager) Flush ¶
func (manager *CheckpointManager) Flush(tablePrefix string) error
func (*CheckpointManager) FlushAll ¶
func (manager *CheckpointManager) FlushAll() error
func (*CheckpointManager) Get ¶
func (manager *CheckpointManager) Get(replset string) bson.MongoTimestamp
func (*CheckpointManager) GetTableList ¶
func (manager *CheckpointManager) GetTableList(tablePrefix string) []string
func (*CheckpointManager) Load ¶
func (manager *CheckpointManager) Load(tablePrefix string) error
func (*CheckpointManager) LoadAll ¶
func (manager *CheckpointManager) LoadAll() error
firstly load checkpoit info to CheckpointManager without concurrent access
type DDLManager ¶
type DDLManager struct { FromCsConn *utils.MongoConn // share config server url ToIsSharding bool // contains filtered or unexported fields }
func NewDDLManager ¶
func NewDDLManager(ckptManager *CheckpointManager) *DDLManager
func (*DDLManager) BlockDDL ¶
func (manager *DDLManager) BlockDDL(replset string, log *oplog.PartialLog) bool
func (*DDLManager) Flush ¶
func (manager *DDLManager) Flush(tablePrefix string) error
func (*DDLManager) GetTableList ¶
func (manager *DDLManager) GetTableList(tablePrefix string) []string
func (*DDLManager) Load ¶
func (manager *DDLManager) Load(tablePrefix string) error
func (*DDLManager) UnBlockDDL ¶
func (manager *DDLManager) UnBlockDDL(replset string, log *oplog.PartialLog)
type MCIItem ¶
type MCIItem struct { Replset string `bson:"replset"` Timestamp bson.MongoTimestamp `bson:"deleteTs"` }
type Module ¶
type Module interface { IsRegistered() bool /** * Module install and initialize. return false on failed * and only invocation on WriteController is preparing */ Install() bool /** * Handle outstanding request message. and messages * are passed one by one. Any changes of message in * Handle() will be preserved and delivery to next * * @return tunnel's error code (<0) or ack value * */ Handle(message *tunnel.WMessage) int64 }
type MoveChunkKey ¶
type MoveChunkKey struct { Id interface{} `bson:"docId"` Namespace string `bson:"namespace"` }
type MoveChunkManager ¶
type MoveChunkManager struct {
// contains filtered or unexported fields
}
func NewMoveChunkManager ¶
func NewMoveChunkManager(ckptManager *CheckpointManager) *MoveChunkManager
func (*MoveChunkManager) BarrierOplog ¶
func (manager *MoveChunkManager) BarrierOplog(replset string, partialLog *oplog.PartialLog) (bool, bool, interface{})
func (*MoveChunkManager) Flush ¶
func (manager *MoveChunkManager) Flush(tablePrefix string) error
func (*MoveChunkManager) GetTableList ¶
func (manager *MoveChunkManager) GetTableList(tablePrefix string) []string
func (*MoveChunkManager) Load ¶
func (manager *MoveChunkManager) Load(tablePrefix string) error
type MoveChunkValue ¶
type MoveChunkValue struct {
// contains filtered or unexported fields
}
type OplogHandler ¶
type OplogHandler interface { // invocation on every oplog consumed Handle(log *oplog.PartialLog) }
type OplogSyncer ¶
type OplogSyncer struct { OplogHandler // contains filtered or unexported fields }
OplogSyncer poll oplogs from original source MongoDB.
func NewOplogSyncer ¶
func NewOplogSyncer( coordinator *ReplicationCoordinator, replset string, fullSyncFinishPosition int64, mongoUrl string, gids []string, ckptManager *CheckpointManager, mvckManager *MoveChunkManager, ddlManager *DDLManager) *OplogSyncer
* Syncer is used to fetch oplog from source MongoDB and then send to different workers which can be seen as * a network sender. There are several syncer coexist to improve the fetching performance. * The data flow in syncer is: * source mongodb --> reader --> pending queue(raw data) --> logs queue(parsed data) --> worker * The reason we split pending queue and logs queue is to improve the performance.
func (*OplogSyncer) Handle ¶
func (sync *OplogSyncer) Handle(log *oplog.PartialLog)
func (*OplogSyncer) RestAPI ¶
func (sync *OplogSyncer) RestAPI()
type ReplicationCoordinator ¶
type ReplicationCoordinator struct { Sources []*utils.MongoSource // contains filtered or unexported fields }
ReplicationCoordinator global coordinator instance. consist of one syncerGroup and a number of workers
func (*ReplicationCoordinator) Run ¶
func (coordinator *ReplicationCoordinator) Run() error
type SyncerMoveChunk ¶
type SyncerMoveChunk struct {
// contains filtered or unexported fields
}
type TransferEventListener ¶
type TransferEventListener struct {
// contains filtered or unexported fields
}
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
func NewWorker ¶
func NewWorker(coordinator *ReplicationCoordinator, syncer *OplogSyncer, id uint32) *Worker
func (*Worker) IsAllAcked ¶
func (*Worker) Offer ¶
func (worker *Worker) Offer(batch []*oplog.GenericOplog)
type WriteController ¶
type WriteController struct { // current max lsn_ack value LatestLsnAck int64 // contains filtered or unexported fields }
func NewWriteController ¶
func NewWriteController(worker *Worker) *WriteController
func (*WriteController) Send ¶
func (controller *WriteController) Send(logs []*oplog.GenericOplog, tag uint32) int64