utils

package
v2.0.0-...-acbaf60 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 4, 2023 License: GPL-3.0 Imports: 32 Imported by: 1

Documentation

Index

Constants

View Source
const (
	GlobalDiagnosticPath = "diagnostic"
	// This is the time of golang was born to the world
	GolangSecurityTime = "2006-01-02T15:04:05Z"

	WorkGood       uint64 = 0
	GetReady       uint64 = 1
	FetchBad       uint64 = 2
	TunnelSendBad  uint64 = 4
	TunnelSyncBad  uint64 = 8
	ReplicaExecBad uint64 = 16

	MajorityWriteConcern = "majority"

	Int32max = (int64(1) << 32) - 1
)
View Source
const (
	DBRefRef = "$ref"
	DBRefId  = "$id"
	DBRefDb  = "$db"

	CollectionCapped           = "CollectionScan died due to position in capped" // bigger than 3.0
	CollectionCappedLowVersion = "UnknownError"                                  // <= 3.0 version
)
View Source
const (
	// log
	VarLogLevelDebug   = "debug"
	VarLogLevelInfo    = "info"
	VarLogLevelWarning = "warning"
	VarLogLevelError   = "error"

	// sync mode
	VarSyncModeAll  = "all"
	VarSyncModeIncr = "incr"
	VarSyncModeFull = "full"

	// mongo connect mode
	VarMongoConnectModePrimary            = "primary"
	VarMongoConnectModeSecondaryPreferred = "secondaryPreferred"
	VarMongoConnectModeSecondary          = "secondary"
	VarMongoConnectModeNearset            = "nearest"
	VarMongoConnectModeStandalone         = "standalone"

	// full_sync.create_index
	VarFullSyncCreateIndexNone       = "none"
	VarFullSyncCreateIndexForeground = "foreground"
	VarFullSyncCreateIndexBackground = "background"

	// incr_sync.mongo_fetch_method
	VarIncrSyncMongoFetchMethodOplog        = "oplog"
	VarIncrSyncMongoFetchMethodChangeStream = "change_stream"

	// incr_sync.shard_key
	VarIncrSyncShardKeyAuto       = "auto"
	VarIncrSyncShardKeyId         = "id"
	VarIncrSyncShardKeyCollection = "collection"

	// incr_sync.worker.oplog_compressor
	VarIncrSyncWorkerOplogCompressorNone    = "none"
	VarIncrSyncWorkerOplogCompressorGzip    = "gzip"
	VarIncrSyncWorkerOplogCompressorZlib    = "zlib"
	VarIncrSyncWorkerOplogCompressorDeflate = "deflate"
	VarIncrSyncWorkerOplogCompressorSnappy  = "snappy"

	// incr_sync.tunnel
	VarTunnelDirect = "direct"
	VarTunnelRpc    = "rpc"
	VarTunnelFile   = "file"
	VarTunnelTcp    = "tcp"
	VarTunnelKafka  = "kafka"
	VarTunnelMock   = "mock"

	// incr_sync.tunnel.message
	VarTunnelMessageRaw  = "raw"
	VarTunnelMessageJson = "json"
	VarTunnelMessageBson = "bson"

	// incr_sync.conflict_write_to
	VarIncrSyncConflictWriteToNone = "none"
	VarIncrSyncConflictWriteToDb   = "db"
	VarIncrSyncConflictWriteToSdk  = "sdk"

	// checkpoint.storage.db
	VarCheckpointStorageDbReplicaDefault  = "mongoshake"
	VarCheckpointStorageDbShardingDefault = "admin"
	VarCheckpointStorageCollectionDefault = "ckpt_default"

	// inner variable: checkpoint.storage
	VarCheckpointStorageApi      = "api"
	VarCheckpointStorageDatabase = "database"

	// innder variable: incr_sync.reader_debug
	VarIncrSyncReaderDebugNone    = ""
	VarIncrSyncReaderDebugDiscard = "discard" // throw all
	VarIncrSyncReaderDebugPrint   = "print"   // print

	// special
	VarSpecialSourceDBFlagAliyunServerless = "aliyun_serverless"
)
View Source
const (
	FetchStageStoreUnknown     int32 = 0
	FetchStageStoreDiskNoApply int32 = 1
	FetchStageStoreDiskApply   int32 = 2
	FetchStageStoreMemoryApply int32 = 3
)
View Source
const (
	JournalNothingOnDefault = iota
	JournalSampling
	JournalAll
)
View Source
const (
	FrequentInSeconds        = 5
	TimeFormat        string = "2006-01-02 15:04:05"
)
View Source
const (
	KB = 1024
	MB = 1024 * KB
	GB = 1024 * MB
	TB = 1024 * GB
	PB = 1024 * TB
)
View Source
const (
	METRIC_NONE            = 0x0000000000000000
	METRIC_CKPT_TIMES      = 0x0000000000000001
	METRIC_TUNNEL_TRAFFIC  = 0x0000000000000010
	METRIC_LSN             = 0x0000000000000100
	METRIC_RETRANSIMISSION = 0x0000000000001000
	METRIC_TPS             = 0x0000000000010000
	METRIC_SUCCESS         = 0x0000000000100000
	METRIC_WORKER          = 0x0000000001000000 // worker metric
	METRIC_FULLSYNC_WRITE  = 0x0000000010000000 // full sync writer
	METRIC_FILTER          = 0x0000000100000000
)
View Source
const (
	TypeFull = "full"
	TypeIncr = "incr"
)
View Source
const (
	OplogNS                      = "oplog.rs"
	ReadWriteConcernDefault      = ""
	ReadWriteConcernLocal        = "local"
	ReadWriteConcernAvailable    = "available" // for >= 3.6
	ReadWriteConcernMajority     = "majority"
	ReadWriteConcernLinearizable = "linearizable"
)
View Source
const (
	BufferCapacity = 4 * 1024 * 1024
)
View Source
const (
	OpsMax = 'z' - 'A'
)
View Source
const (
	SampleFrequency = 1000
)

Variables

View Source
var (
	AppDatabase         = VarCheckpointStorageDbReplicaDefault
	APPConflictDatabase = VarCheckpointStorageDbReplicaDefault + "_conflict"
)
View Source
var (
	FcvCheckpoint = Checkpoint{
		CurrentVersion:           2,
		FeatureCompatibleVersion: 1,
	}
	FcvConfiguration = Configuration{
		CurrentVersion:           10,
		FeatureCompatibleVersion: 10,
	}

	LowestCheckpointVersion = map[int]string{
		0: "1.0.0",
		1: "2.4.0",
		2: "2.4.6",
	}
	LowestConfigurationVersion = map[int]string{
		0:  "1.0.0",
		1:  "2.4.0",
		2:  "2.4.1",
		3:  "2.4.3",
		4:  "2.4.6",
		5:  "2.4.7",
		6:  "2.4.12",
		7:  "2.4.17",
		8:  "2.4.20",
		9:  "2.4.21",
		10: "2.6.4",
	}
)
View Source
var (
	FullSyncHttpApi *nimo.HttpRestProvider
	IncrSyncHttpApi *nimo.HttpRestProvider
)
View Source
var BRANCH = "$"

Build info

View Source
var FullSentinelOptions struct {
	TPS int64
}

only used in full sync.

View Source
var (
	GetAllTimestampInUTInput map[string]Pair // replicaSet/MongoS name => <oldest timestamp, newest timestamp>
)

for UT only

View Source
var IncrSentinelOptions struct {
	OplogDump      int64
	DuplicatedDump bool
	Pause          bool
	TPS            int64
	TargetDelay    int64
	ExitPoint      int64 // store 64 bit full timestamp
	Shutdown       bool  // close shake
}

IncrSentinelOptions. option's value type should be String or Bool or Int64 only used in incremental sync.

View Source
var JournalFilePattern = GlobalDiagnosticPath + string(filepath.Separator) + "%s.journal"
View Source
var (
	QueryTs = "ts"
)
View Source
var SIGNALPROFILE = "$"
View Source
var SIGNALSTACK = "$"

Functions

func ApplyOpsFilter

func ApplyOpsFilter(key string) bool

func BlockMongoUrlPassword

func BlockMongoUrlPassword(url, replace string) string

*

  • block password in mongo_urls:
  • two kind mongo_urls:
  • 1. mongodb://username:password@address
  • 2. username:password@address

func DEBUG_LOG

func DEBUG_LOG(arg0 interface{}, args ...interface{})

func DelayFor

func DelayFor(ms int64)

func DuplicateKey

func DuplicateKey(err error) bool

func ExtractMongoTimestamp

func ExtractMongoTimestamp(ts interface{}) int64

func ExtractMongoTimestampCounter

func ExtractMongoTimestampCounter(ts interface{}) int64

func ExtractTimestampForLog

func ExtractTimestampForLog(ts interface{}) string

func FindFirstErrorIndexAndMessageN

func FindFirstErrorIndexAndMessageN(err error) (int, string, bool)

func FullSyncInitHttpApi

func FullSyncInitHttpApi(port int)

func GetAllNamespace

func GetAllNamespace(sources []*MongoSource, filterFunc func(name string) bool,
	sslRootFile string) (map[NS]struct{}, map[string][]string, error)

*

  • return all namespace. return:
  • @map[NS]struct{}: namespace set where key is the namespace while value is useless, e.g., "a.b"->nil, "a.c"->nil
  • @map[string][]string: db->collection map. e.g., "a"->[]string{"b", "c"}
  • @error: error info

func GetAllTimestamp

func GetAllTimestamp(sources []*MongoSource, sslRootFile string) (map[string]TimestampNode, int64,
	int64, int64, int64, error)

* get all newest timestamp * return: * map: whole timestamp map, key: replset name, value: struct that includes the newest and oldest timestamp * primitive.Timestamp: the biggest of the newest timestamp * primitive.Timestamp: the smallest of the newest timestamp * error: error

func GetAllTimestampInUT

func GetAllTimestampInUT() (map[string]TimestampNode, int64,
	int64, int64, int64, error)

only used in unit test

func GetAndCompareVersion

func GetAndCompareVersion(conn *MongoCommunityConn, threshold string, compare string) (bool, error)

get current db version and compare to threshold. Return whether the result is bigger or equal to the input threshold.

func GetDBVersion

func GetDBVersion(conn *MongoCommunityConn) (string, error)

get db version, return string with format like "3.0.1"

func GetKey

func GetKey(log bson.D, wanted string) interface{}

func GetKeyWithIndex

func GetKeyWithIndex(log bson.D, wanted string) (interface{}, int)

func GetListCollectionQueryCondition

func GetListCollectionQueryCondition(conn *MongoCommunityConn) bson.M

func GetMetricWithSize

func GetMetricWithSize(input interface{}) string

func GetNewestTimestampByConn

func GetNewestTimestampByConn(conn *MongoCommunityConn) (int64, error)

get newest oplog

func GetNewestTimestampByUrl

func GetNewestTimestampByUrl(url string, fromMongoS bool, sslRootFile string) (int64, error)

func GetOldestTimestampByConn

func GetOldestTimestampByConn(conn *MongoCommunityConn) (int64, error)

get oldest oplog

func GetOldestTimestampByUrl

func GetOldestTimestampByUrl(url string, fromMongoS bool, sslRootFile string) (int64, error)

func Goodbye

func Goodbye()

func HasDuplicated

func HasDuplicated(slice []string) bool

func HaveIdIndexKey

func HaveIdIndexKey(obj bson.D) bool

Return true only Indexe only have key _id

func IncrSyncInitHttpApi

func IncrSyncInitHttpApi(port int)

func InitialLogger

func InitialLogger(logDir, logFile, level string, logFlush bool, verbose int) error

InitialLogger initialize logger

verbose: where log goes to: 0 - file,1 - file+stdout,2 - stdout

func Int64ToString

func Int64ToString(v int64) string

func Int64ToTimestamp

func Int64ToTimestamp(t int64) primitive.Timestamp

func IsCollectionCappedError

func IsCollectionCappedError(err error) bool

func JournalFileName

func JournalFileName(identifier string) string

func LogFetchStage

func LogFetchStage(stage int32) string

func MarshalStruct

func MarshalStruct(input interface{}) string

marshal given struct by json

func MayBeRandom

func MayBeRandom(port int) int

func Mkdirs

func Mkdirs(dirs ...string) error

func ParseIntFromInterface

func ParseIntFromInterface(input interface{}) (int64, error)

func RunStatusMessage

func RunStatusMessage(status uint64) string

func SetFiled

func SetFiled(input bson.D, key string, value interface{}, upsert bool)

func TimeStampToInt64

func TimeStampToInt64(ts primitive.Timestamp) int64

func TimeToTimestamp

func TimeToTimestamp(t int64) primitive.Timestamp

Unix() to TimeStamp

func TimestampToString

func TimestampToString(ts int64) string

func Welcome

func Welcome()

func WritePid

func WritePid(id string) (err error)

func WritePidById

func WritePidById(dir, id string) bool

func YieldInMs

func YieldInMs(n int64)

Types

type ChangeStreamConn

type ChangeStreamConn struct {
	Client    *mongo.Client
	CsHandler *mongo.ChangeStream
	Ops       *options.ChangeStreamOptions
	// contains filtered or unexported fields
}

func NewChangeStreamConn

func NewChangeStreamConn(src string,
	mode string,
	fullDoc bool,
	specialDb string,
	filterFunc func(name string) bool,
	watchStartTime interface{},
	batchSize int32,
	sourceDbversion string,
	sslRootFile string) (*ChangeStreamConn, error)

func (*ChangeStreamConn) Close

func (csc *ChangeStreamConn) Close()

func (*ChangeStreamConn) GetNext

func (csc *ChangeStreamConn) GetNext() (bool, []byte)

func (*ChangeStreamConn) IsNotNil

func (csc *ChangeStreamConn) IsNotNil() bool

func (*ChangeStreamConn) ResumeToken

func (csc *ChangeStreamConn) ResumeToken() interface{}

func (*ChangeStreamConn) TryNext

func (csc *ChangeStreamConn) TryNext() (bool, []byte)

type Checkpoint

type Checkpoint struct {
	/*
	 * version: 0(or set not), MongoShake < 2.4, fcv == 0
	 * version: 1, MongoShake == 2.4, 0 < fcv <= 1
	 */
	CurrentVersion           int
	FeatureCompatibleVersion int
}

for checkpoint

func (Checkpoint) IsCompatible

func (c Checkpoint) IsCompatible(v int) bool

type Configuration

type Configuration struct {
	/*
	 * version: 0(or set not), MongoShake < 2.4.0, fcv == 0
	 * version: 1, MongoShake == 2.4.0, 0 <= fcv <= 1
	 */
	CurrentVersion           int
	FeatureCompatibleVersion int
}

for configuration

func (Configuration) IsCompatible

func (c Configuration) IsCompatible(v int) bool

type ElapsedTask

type ElapsedTask struct {
	// timer trigger
	TimeLimit int64
	// batch trigger
	BatchLimit int64
	// contains filtered or unexported fields
}

func NewThresholder

func NewThresholder(timeLimit, batchLimit int64) *ElapsedTask

func (*ElapsedTask) Reset

func (thresholder *ElapsedTask) Reset()

func (*ElapsedTask) Triiger

func (thresholder *ElapsedTask) Triiger() bool

type Fcv

type Fcv interface {
	IsCompatible(int) bool
}

type Int64Slice

type Int64Slice []int64

func (Int64Slice) Len

func (p Int64Slice) Len() int

func (Int64Slice) Less

func (p Int64Slice) Less(i, j int) bool

func (Int64Slice) Swap

func (p Int64Slice) Swap(i, j int)

type Journal

type Journal struct {
	// contains filtered or unexported fields
}

func NewJournal

func NewJournal(name string) *Journal

func (*Journal) WriteRecord

func (j *Journal) WriteRecord(oplog *oplog.PartialLog)

type MetricDelta

type MetricDelta struct {
	Value uint64
	Delta uint64
	// contains filtered or unexported fields
}

struct used to mark the metric delta. Value: current value Delta: the difference between current value and previous value previous: store the previous value

func (*MetricDelta) Update

func (o *MetricDelta) Update()

type MongoCommunityConn

type MongoCommunityConn struct {
	Client *mongo.Client
	URL    string
	// contains filtered or unexported fields
}

func NewMongoCommunityConn

func NewMongoCommunityConn(url string, connectMode string, timeout bool, readConcern,
	writeConcern string, sslRootFile string) (*MongoCommunityConn, error)

func (*MongoCommunityConn) AcquireReplicaSetName

func (conn *MongoCommunityConn) AcquireReplicaSetName() string

func (*MongoCommunityConn) Close

func (conn *MongoCommunityConn) Close()

func (*MongoCommunityConn) CurrentDate

func (conn *MongoCommunityConn) CurrentDate() primitive.Timestamp

func (*MongoCommunityConn) HasOplogNs

func (conn *MongoCommunityConn) HasOplogNs(queryConditon bson.M) bool

func (*MongoCommunityConn) HasUniqueIndex

func (conn *MongoCommunityConn) HasUniqueIndex(queryConditon bson.M) bool

func (*MongoCommunityConn) IsGood

func (conn *MongoCommunityConn) IsGood() bool

func (*MongoCommunityConn) IsTimeSeriesCollection

func (conn *MongoCommunityConn) IsTimeSeriesCollection(dbName string, collName string) bool

type MongoSource

type MongoSource struct {
	URL         string
	ReplicaName string
	Gids        []string
}

func (*MongoSource) String

func (ms *MongoSource) String() string

type NS

type NS struct {
	Database   string
	Collection string
}

func GetDbNamespace

func GetDbNamespace(url string, filterFunc func(name string) bool, sslRootFile string) ([]NS, map[string][]string, error)

*

  • return db namespace. return:
  • @[]NS: namespace list, e.g., []{"a.b", "a.c"}
  • @map[string][]string: db->collection map. e.g., "a"->[]string{"b", "c"}
  • @error: error info

func NewNS

func NewNS(namespace string) NS

func (NS) Str

func (ns NS) Str() string

type OpsCounter

type OpsCounter struct {
	// contains filtered or unexported fields
}

one writer and multi readers

func (*OpsCounter) Add

func (opsCounter *OpsCounter) Add(char byte, v uint64)

func (*OpsCounter) Map

func (opsCounter *OpsCounter) Map() map[string]uint64

type Pair

type Pair struct {
	First  interface{}
	Second interface{}
}

type Qos

type Qos struct {
	Limit  int64 // qps, <= 0 means disable limit
	Ticket int64 // one tick size, default is 1
	// contains filtered or unexported fields
}

func StartQoS

func StartQoS(limit, ticket int64, addr *int64) *Qos

func (*Qos) Close

func (q *Qos) Close()

func (*Qos) FetchBucket

func (q *Qos) FetchBucket()

type ReplicationMetric

type ReplicationMetric struct {
	NAME      string
	STAGE     string
	SUBSCRIBE uint64

	OplogFilter     MetricDelta
	OplogGet        MetricDelta
	OplogConsume    MetricDelta
	OplogApply      MetricDelta
	OplogSuccess    MetricDelta
	OplogFail       MetricDelta
	OplogWriteFail  MetricDelta // full: write failed. currently, only used in full sync stage.
	CheckpointTimes uint64
	Retransmission  uint64
	TunnelTraffic   uint64
	LSN             int64
	LSNAck          int64
	LSNCheckpoint   int64

	OplogMaxSize int64
	OplogAvgSize int64

	TableOperations *TableOps

	// replication status
	ReplStatus ReplicationStatus
	// contains filtered or unexported fields
}

func NewMetric

func NewMetric(name, stage string, subscribe uint64) *ReplicationMetric

func (*ReplicationMetric) AddApply

func (metric *ReplicationMetric) AddApply(incr uint64)

func (*ReplicationMetric) AddCheckpoint

func (metric *ReplicationMetric) AddCheckpoint(number uint64)

func (*ReplicationMetric) AddConsume

func (metric *ReplicationMetric) AddConsume(incr uint64)

func (*ReplicationMetric) AddFailed

func (metric *ReplicationMetric) AddFailed(incr uint64)

func (*ReplicationMetric) AddFilter

func (metric *ReplicationMetric) AddFilter(incr uint64)

func (*ReplicationMetric) AddGet

func (metric *ReplicationMetric) AddGet(incr uint64)

func (*ReplicationMetric) AddRetransmission

func (metric *ReplicationMetric) AddRetransmission(number uint64)

func (*ReplicationMetric) AddSuccess

func (metric *ReplicationMetric) AddSuccess(incr uint64)

func (*ReplicationMetric) AddTableOps

func (metric *ReplicationMetric) AddTableOps(table string, n uint64)

func (*ReplicationMetric) AddTunnelTraffic

func (metric *ReplicationMetric) AddTunnelTraffic(number uint64)

func (*ReplicationMetric) AddWriteFailed

func (metric *ReplicationMetric) AddWriteFailed(incr uint64)

func (*ReplicationMetric) Apply

func (metric *ReplicationMetric) Apply() uint64

func (*ReplicationMetric) Close

func (metric *ReplicationMetric) Close()

func (*ReplicationMetric) Get

func (metric *ReplicationMetric) Get() uint64

func (*ReplicationMetric) SetLSN

func (metric *ReplicationMetric) SetLSN(lsn int64)

func (*ReplicationMetric) SetLSNACK

func (metric *ReplicationMetric) SetLSNACK(ack int64)

func (*ReplicationMetric) SetLSNCheckpoint

func (metric *ReplicationMetric) SetLSNCheckpoint(ckpt int64)

func (*ReplicationMetric) SetOplogAvg

func (metric *ReplicationMetric) SetOplogAvg(size int64)

func (*ReplicationMetric) SetOplogMax

func (metric *ReplicationMetric) SetOplogMax(max int64)

func (*ReplicationMetric) String

func (metric *ReplicationMetric) String() string

func (*ReplicationMetric) Success

func (metric *ReplicationMetric) Success() uint64

func (*ReplicationMetric) TableOps

func (metric *ReplicationMetric) TableOps() map[string]uint64

func (*ReplicationMetric) Tps

func (metric *ReplicationMetric) Tps() uint64

type ReplicationStatus

type ReplicationStatus uint64

func (*ReplicationStatus) Clear

func (status *ReplicationStatus) Clear(s uint64)

func (*ReplicationStatus) GetStatusString

func (status *ReplicationStatus) GetStatusString() string

func (*ReplicationStatus) IsGood

func (status *ReplicationStatus) IsGood() bool

func (*ReplicationStatus) Update

func (status *ReplicationStatus) Update(s uint64)

type Sentinel

type Sentinel struct {
	// contains filtered or unexported fields
}

func NewSentinel

func NewSentinel(tp string) *Sentinel

func (*Sentinel) Register

func (sentinel *Sentinel) Register()

type TableOps

type TableOps struct {
	sync.Mutex
	// contains filtered or unexported fields
}

TableOps, count collection operations

func NewTableOps

func NewTableOps() *TableOps

func (*TableOps) Incr

func (t *TableOps) Incr(table string, n uint64)

func (*TableOps) MakeCopy

func (t *TableOps) MakeCopy() map[string]uint64

type TimestampNode

type TimestampNode struct {
	Oldest int64
	Newest int64
}

record the oldest and newest timestamp of each mongod

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL