Documentation ¶
Index ¶
- Constants
- Variables
- func DeduplicateMsgs(ctx context.Context, delMsgs []*proto.DeleteMsg) (msgs []*proto.DeleteMsg)
- func DoubleCheckedRun(ctx context.Context, c IClusterTopology, vid proto.Vid, ...) error
- func NewHandler(service *Service) *rpc.Router
- type BalanceMgr
- type BalanceMgrConfig
- type BlobDeleteConfig
- type BlobDeleteKafkaConfig
- type BlobDeleteMgr
- type ClusterTopology
- type ClusterTopologyMgr
- func (m *ClusterTopologyMgr) GetIDCDisks(idc string) (disks []*client.DiskInfoSimple)
- func (m *ClusterTopologyMgr) GetIDCs() map[string]*IDC
- func (m *ClusterTopologyMgr) GetVolume(vid proto.Vid) (*client.VolumeInfoSimple, error)
- func (m *ClusterTopologyMgr) IsBrokenDisk(diskID proto.DiskID) bool
- func (m *ClusterTopologyMgr) LoadVolumes() error
- func (m *ClusterTopologyMgr) MaxFreeChunksDisk(idc string) *client.DiskInfoSimple
- func (m *ClusterTopologyMgr) ReportFreeChunkCnt(disk *client.DiskInfoSimple)
- func (m *ClusterTopologyMgr) UpdateVolume(vid proto.Vid) (*client.VolumeInfoSimple, error)
- type Config
- type DelDoc
- type DeletedTask
- type DiskDropMgr
- func (mgr *DiskDropMgr) DiskProgress(ctx context.Context, diskID proto.DiskID) (stats *api.DiskMigratingStats, err error)
- func (mgr *DiskDropMgr) Load() (err error)
- func (mgr *DiskDropMgr) Progress(ctx context.Context) (migratingDisks []proto.DiskID, total, migrated int)
- func (mgr *DiskDropMgr) Run()
- type DiskRepairMgr
- func (mgr *DiskRepairMgr) AcquireTask(ctx context.Context, idc string) (task proto.MigrateTask, err error)
- func (mgr *DiskRepairMgr) CancelTask(ctx context.Context, args *api.OperateTaskArgs) error
- func (mgr *DiskRepairMgr) CompleteTask(ctx context.Context, args *api.OperateTaskArgs) error
- func (mgr *DiskRepairMgr) DiskProgress(ctx context.Context, diskID proto.DiskID) (stats *api.DiskMigratingStats, err error)
- func (mgr *DiskRepairMgr) Enabled() bool
- func (mgr *DiskRepairMgr) Load() error
- func (mgr *DiskRepairMgr) Progress(ctx context.Context) (migratingDisks []proto.DiskID, total, migrated int)
- func (mgr *DiskRepairMgr) QueryTask(ctx context.Context, taskID string) (*api.MigrateTaskDetail, error)
- func (mgr *DiskRepairMgr) ReclaimTask(ctx context.Context, idc, taskID string, src []proto.VunitLocation, ...) error
- func (mgr *DiskRepairMgr) RenewalTask(ctx context.Context, idc, taskID string) error
- func (mgr *DiskRepairMgr) ReportWorkerTaskStats(st *api.TaskReportArgs)
- func (mgr *DiskRepairMgr) Run()
- func (mgr *DiskRepairMgr) StatQueueTaskCnt() (inited, prepared, completed int)
- func (mgr *DiskRepairMgr) Stats() api.MigrateTasksStat
- func (mgr *DiskRepairMgr) WaitEnable()
- type Host
- type HourRange
- type IClusterTopology
- type IDC
- type IDisKMigrator
- type IManualMigrator
- type IMigrator
- type ITaskRunner
- type IVolumeCache
- type IVolumeInspector
- type KafkaConfig
- type MMigrator
- type ManualMigrateMgr
- type MigrateConfig
- type MigrateMgr
- func (mgr *MigrateMgr) AcquireTask(ctx context.Context, idc string) (task proto.MigrateTask, err error)
- func (mgr *MigrateMgr) AddTask(ctx context.Context, task *proto.MigrateTask)
- func (mgr *MigrateMgr) CancelTask(ctx context.Context, args *api.OperateTaskArgs) (err error)
- func (mgr *MigrateMgr) ClearDeletedTaskByID(diskID proto.DiskID, taskID string)
- func (mgr *MigrateMgr) ClearDeletedTasks(diskID proto.DiskID)
- func (mgr *MigrateMgr) CompleteTask(ctx context.Context, args *api.OperateTaskArgs) (err error)
- func (mgr *MigrateMgr) DeletedTasks() []DeletedTask
- func (mgr *MigrateMgr) Enabled() bool
- func (mgr *MigrateMgr) FinishTaskInAdvanceWhenLockFail(ctx context.Context, task *proto.MigrateTask)
- func (mgr *MigrateMgr) GetMigratingDiskNum() int
- func (mgr *MigrateMgr) GetTask(ctx context.Context, taskID string) (*proto.MigrateTask, error)
- func (mgr *MigrateMgr) IsDeletedTask(task *proto.MigrateTask) bool
- func (mgr *MigrateMgr) IsMigratingDisk(diskID proto.DiskID) bool
- func (mgr *MigrateMgr) ListAllTask(ctx context.Context) (tasks []*proto.MigrateTask, err error)
- func (mgr *MigrateMgr) ListAllTaskByDiskID(ctx context.Context, diskID proto.DiskID) (tasks []*proto.MigrateTask, err error)
- func (mgr *MigrateMgr) Load() (err error)
- func (mgr *MigrateMgr) QueryTask(ctx context.Context, taskID string) (*api.MigrateTaskDetail, error)
- func (mgr *MigrateMgr) ReclaimTask(ctx context.Context, idc, taskID string, src []proto.VunitLocation, ...) (err error)
- func (mgr *MigrateMgr) RenewalTask(ctx context.Context, idc, taskID string) (err error)
- func (mgr *MigrateMgr) ReportWorkerTaskStats(st *api.TaskReportArgs)
- func (mgr *MigrateMgr) Run()
- func (mgr *MigrateMgr) SetClearJunkTasksWhenLoadingFunc(clearJunkTasksWhenLoadingFunc clearJunkTasksFunc)
- func (mgr *MigrateMgr) SetLockFailHandleFunc(lockFailHandleFunc lockFailFunc)
- func (mgr *MigrateMgr) StatQueueTaskCnt() (inited, prepared, completed int)
- func (mgr *MigrateMgr) Stats() api.MigrateTasksStat
- func (mgr *MigrateMgr) WaitEnable()
- type MigratingVuids
- type Migrator
- type OrphanShard
- type Rack
- type Service
- func (svr *Service) Close()
- func (svr *Service) CloseKafkaMonitors()
- func (svr *Service) HTTPDiskMigratingStats(c *rpc.Context)
- func (svr *Service) HTTPInspectAcquire(c *rpc.Context)
- func (svr *Service) HTTPInspectComplete(c *rpc.Context)
- func (svr *Service) HTTPManualMigrateTaskAdd(c *rpc.Context)
- func (svr *Service) HTTPMigrateTaskDetail(c *rpc.Context)
- func (svr *Service) HTTPStats(c *rpc.Context)
- func (svr *Service) HTTPTaskAcquire(c *rpc.Context)
- func (svr *Service) HTTPTaskCancel(c *rpc.Context)
- func (svr *Service) HTTPTaskComplete(c *rpc.Context)
- func (svr *Service) HTTPTaskReclaim(c *rpc.Context)
- func (svr *Service) HTTPTaskRenewal(c *rpc.Context)
- func (svr *Service) HTTPTaskReport(c *rpc.Context)
- func (svr *Service) HTTPUpdateVolume(c *rpc.Context)
- func (svr *Service) Handler(w http.ResponseWriter, req *http.Request, ...)
- func (svr *Service) LoadVolInfo() error
- func (svr *Service) NewKafkaMonitor(clusterID proto.ClusterID, access base.IConsumerOffset) error
- func (svr *Service) Run()
- func (svr *Service) RunKafkaMonitors()
- func (svr *Service) RunTask()
- type ServiceRegisterConfig
- type Services
- type ShardRepairConfig
- type ShardRepairKafkaConfig
- type ShardRepairMgr
- type TopicConfig
- type VolumeCache
- type VolumeInspectMgr
- func (mgr *VolumeInspectMgr) AcquireInspect(ctx context.Context) (*proto.VolumeInspectTask, error)
- func (mgr *VolumeInspectMgr) CompleteInspect(ctx context.Context, ret *proto.VolumeInspectRet)
- func (mgr *VolumeInspectMgr) Enabled() bool
- func (mgr *VolumeInspectMgr) GetTaskStats() (finished, timeout [counter.SLOT]int)
- func (mgr *VolumeInspectMgr) Run()
- type VolumeInspectMgrCfg
Constants ¶
const ( DelDone = deleteStatus(iota) DelDelay DelFailed DelUnexpect )
blob delete status
const ( ShardRepairDone = shardRepairStatus(iota) ShardRepairFailed ShardRepairUnexpect ShardRepairOrphan )
shard repair status
const (
ShardRepair = "shard_repair"
)
shard repair name
Variables ¶
var ( // ErrNoBalanceVunit no balance volume unit on disk ErrNoBalanceVunit = errors.New("no balance volume unit on disk") // ErrTooManyBalancingTasks too many balancing tasks ErrTooManyBalancingTasks = errors.New("too many balancing tasks") )
ErrBlobnodeServiceUnavailable worker service unavailable
var ErrFrequentlyUpdate = errors.New("frequently update")
ErrFrequentlyUpdate frequently update
var ErrVunitLengthNotEqual = errors.New("vunit length not equal")
ErrVunitLengthNotEqual vunit length not equal
Functions ¶
func DeduplicateMsgs ¶
DeduplicateMsgs deduplicate delete messages
func DoubleCheckedRun ¶
func DoubleCheckedRun(ctx context.Context, c IClusterTopology, vid proto.Vid, task func(*client.VolumeInfoSimple) (*client.VolumeInfoSimple, error)) error
DoubleCheckedRun the scheduler updates volume mapping relation asynchronously, then some task(delete or repair) had started with old volume mapping.
if delete on old relation, there will has garbage shard in new chunk. ==> garbage shard if repair on old relation, there still is missing shard in new chunk. ==> missing shard
func NewHandler ¶
NewHandler returns app server handler
Types ¶
type BalanceMgr ¶
type BalanceMgr struct { IMigrator // contains filtered or unexported fields }
BalanceMgr balance manager
func NewBalanceMgr ¶
func NewBalanceMgr(clusterMgrCli client.ClusterMgrAPI, volumeUpdater client.IVolumeUpdater, taskSwitch taskswitch.ISwitcher, clusterTopology IClusterTopology, taskLogger recordlog.Encoder, conf *BalanceMgrConfig) *BalanceMgr
NewBalanceMgr returns balance manager
type BalanceMgrConfig ¶
type BalanceMgrConfig struct { MaxDiskFreeChunkCnt int64 `json:"max_disk_free_chunk_cnt"` MinDiskFreeChunkCnt int64 `json:"min_disk_free_chunk_cnt"` MigrateConfig }
BalanceMgrConfig balance task manager config
type BlobDeleteConfig ¶
type BlobDeleteConfig struct { ClusterID proto.ClusterID TaskPoolSize int `json:"task_pool_size"` FailMsgConsumeIntervalMs int64 `json:"fail_msg_consume_interval_ms"` NormalHandleBatchCnt int `json:"normal_handle_batch_cnt"` FailHandleBatchCnt int `json:"fail_handle_batch_cnt"` SafeDelayTimeH int64 `json:"safe_delay_time_h"` DeleteHourRange HourRange `json:"delete_hour_range"` DeleteLog recordlog.Config `json:"delete_log"` Kafka BlobDeleteKafkaConfig `json:"-"` }
BlobDeleteConfig is blob delete config
type BlobDeleteKafkaConfig ¶
type BlobDeleteKafkaConfig struct { BrokerList []string `json:"-"` FailMsgSenderTimeoutMs int64 `json:"-"` Normal TopicConfig `json:"normal"` Failed TopicConfig `json:"failed"` }
BlobDeleteKafkaConfig is kafka config of blob delete
type BlobDeleteMgr ¶
type BlobDeleteMgr struct {
// contains filtered or unexported fields
}
BlobDeleteMgr is blob delete manager
func NewBlobDeleteMgr ¶
func NewBlobDeleteMgr( cfg *BlobDeleteConfig, clusterTopology IClusterTopology, blobnodeCli client.BlobnodeAPI, switchMgr *taskswitch.SwitchMgr, clusterMgrCli client.ClusterMgrAPI, ) (*BlobDeleteMgr, error)
NewBlobDeleteMgr returns blob delete manager
func (*BlobDeleteMgr) Enabled ¶
func (mgr *BlobDeleteMgr) Enabled() bool
Enabled returns return if delete task switch is enable, otherwise returns false
func (*BlobDeleteMgr) GetErrorStats ¶
func (mgr *BlobDeleteMgr) GetErrorStats() (errStats []string, totalErrCnt uint64)
GetErrorStats returns error stats
func (*BlobDeleteMgr) GetTaskStats ¶
GetTaskStats returns task stats
func (*BlobDeleteMgr) RunTask ¶
func (mgr *BlobDeleteMgr) RunTask()
RunTask consumers delete messages
type ClusterTopology ¶
type ClusterTopology struct { FreeChunkCnt int64 MaxChunkCnt int64 // contains filtered or unexported fields }
ClusterTopology cluster topology
type ClusterTopologyMgr ¶
ClusterTopologyMgr cluster topology manager
func (*ClusterTopologyMgr) GetIDCDisks ¶
func (m *ClusterTopologyMgr) GetIDCDisks(idc string) (disks []*client.DiskInfoSimple)
GetIDCDisks returns disks with IDC
func (*ClusterTopologyMgr) GetIDCs ¶
func (m *ClusterTopologyMgr) GetIDCs() map[string]*IDC
GetIDCs returns IDCs
func (*ClusterTopologyMgr) GetVolume ¶
func (m *ClusterTopologyMgr) GetVolume(vid proto.Vid) (*client.VolumeInfoSimple, error)
func (*ClusterTopologyMgr) IsBrokenDisk ¶
func (m *ClusterTopologyMgr) IsBrokenDisk(diskID proto.DiskID) bool
func (*ClusterTopologyMgr) LoadVolumes ¶
func (m *ClusterTopologyMgr) LoadVolumes() error
func (*ClusterTopologyMgr) MaxFreeChunksDisk ¶
func (m *ClusterTopologyMgr) MaxFreeChunksDisk(idc string) *client.DiskInfoSimple
MaxFreeChunksDisk returns disk which has max free chunks
func (*ClusterTopologyMgr) ReportFreeChunkCnt ¶
func (m *ClusterTopologyMgr) ReportFreeChunkCnt(disk *client.DiskInfoSimple)
ReportFreeChunkCnt report free chunk cnt
func (*ClusterTopologyMgr) UpdateVolume ¶
func (m *ClusterTopologyMgr) UpdateVolume(vid proto.Vid) (*client.VolumeInfoSimple, error)
type Config ¶
type Config struct { cmd.Config ClusterID proto.ClusterID `json:"cluster_id"` Services Services `json:"services"` TopologyUpdateIntervalMin int `json:"topology_update_interval_min"` VolumeCacheUpdateIntervalS int `json:"volume_cache_update_interval_s"` FreeChunkCounterBuckets []float64 `json:"free_chunk_counter_buckets"` ClusterMgr clustermgr.Config `json:"clustermgr"` Proxy proxy.LbConfig `json:"proxy"` Blobnode blobnode.Config `json:"blobnode"` Scheduler scheduler.Config `json:"scheduler"` Balance BalanceMgrConfig `json:"balance"` DiskDrop MigrateConfig `json:"disk_drop"` DiskRepair MigrateConfig `json:"disk_repair"` ManualMigrate MigrateConfig `json:"manual_migrate"` VolumeInspect VolumeInspectMgrCfg `json:"volume_inspect"` TaskLog recordlog.Config `json:"task_log"` Kafka KafkaConfig `json:"kafka"` ShardRepair ShardRepairConfig `json:"shard_repair"` BlobDelete BlobDeleteConfig `json:"blob_delete"` ServiceRegister ServiceRegisterConfig `json:"service_register"` }
Config service config
type DelDoc ¶
type DelDoc struct { ClusterID proto.ClusterID `json:"cid"` Bid proto.BlobID `json:"bid"` Vid proto.Vid `json:"vid"` Retry int `json:"retry"` Time int64 `json:"t"` ReqID string `json:"rid"` ActualDelTime int64 `json:"del_at"` // unix time in S }
DelDoc is a delete doc information for logging in dellog
type DeletedTask ¶
type DiskDropMgr ¶
type DiskDropMgr struct { IMigrator // contains filtered or unexported fields }
DiskDropMgr disk drop manager
func NewDiskDropMgr ¶
func NewDiskDropMgr(clusterMgrCli client.ClusterMgrAPI, volumeUpdater client.IVolumeUpdater, taskSwitch taskswitch.ISwitcher, taskLogger recordlog.Encoder, conf *MigrateConfig) *DiskDropMgr
NewDiskDropMgr returns disk drop manager
func (*DiskDropMgr) DiskProgress ¶
func (mgr *DiskDropMgr) DiskProgress(ctx context.Context, diskID proto.DiskID) (stats *api.DiskMigratingStats, err error)
func (*DiskDropMgr) Load ¶
func (mgr *DiskDropMgr) Load() (err error)
Load load disk drop task from database
type DiskRepairMgr ¶
DiskRepairMgr repair task manager
func NewDiskRepairMgr ¶
func NewDiskRepairMgr(clusterMgrCli client.ClusterMgrAPI, taskSwitch taskswitch.ISwitcher, taskLogger recordlog.Encoder, cfg *MigrateConfig) *DiskRepairMgr
NewDiskRepairMgr returns repair manager
func (*DiskRepairMgr) AcquireTask ¶
func (mgr *DiskRepairMgr) AcquireTask(ctx context.Context, idc string) (task proto.MigrateTask, err error)
AcquireTask acquire repair task
func (*DiskRepairMgr) CancelTask ¶
func (mgr *DiskRepairMgr) CancelTask(ctx context.Context, args *api.OperateTaskArgs) error
CancelTask cancel repair task
func (*DiskRepairMgr) CompleteTask ¶
func (mgr *DiskRepairMgr) CompleteTask(ctx context.Context, args *api.OperateTaskArgs) error
CompleteTask complete repair task
func (*DiskRepairMgr) DiskProgress ¶
func (mgr *DiskRepairMgr) DiskProgress(ctx context.Context, diskID proto.DiskID) (stats *api.DiskMigratingStats, err error)
func (*DiskRepairMgr) Enabled ¶
func (mgr *DiskRepairMgr) Enabled() bool
func (*DiskRepairMgr) Load ¶
func (mgr *DiskRepairMgr) Load() error
Load load repair task from database
func (*DiskRepairMgr) Progress ¶
func (mgr *DiskRepairMgr) Progress(ctx context.Context) (migratingDisks []proto.DiskID, total, migrated int)
Progress repair manager progress
func (*DiskRepairMgr) QueryTask ¶
func (mgr *DiskRepairMgr) QueryTask(ctx context.Context, taskID string) (*api.MigrateTaskDetail, error)
QueryTask return task statistics
func (*DiskRepairMgr) ReclaimTask ¶
func (mgr *DiskRepairMgr) ReclaimTask(ctx context.Context, idc, taskID string, src []proto.VunitLocation, oldDst proto.VunitLocation, newDst *client.AllocVunitInfo) error
ReclaimTask reclaim repair task
func (*DiskRepairMgr) RenewalTask ¶
func (mgr *DiskRepairMgr) RenewalTask(ctx context.Context, idc, taskID string) error
RenewalTask renewal repair task
func (*DiskRepairMgr) ReportWorkerTaskStats ¶
func (mgr *DiskRepairMgr) ReportWorkerTaskStats(st *api.TaskReportArgs)
ReportWorkerTaskStats reports task stats
func (*DiskRepairMgr) Run ¶
func (mgr *DiskRepairMgr) Run()
Run run repair task includes collect/prepare/finish/check phase
func (*DiskRepairMgr) StatQueueTaskCnt ¶
func (mgr *DiskRepairMgr) StatQueueTaskCnt() (inited, prepared, completed int)
StatQueueTaskCnt returns task queue stats
func (*DiskRepairMgr) Stats ¶
func (mgr *DiskRepairMgr) Stats() api.MigrateTasksStat
Stats returns task stats
func (*DiskRepairMgr) WaitEnable ¶
func (mgr *DiskRepairMgr) WaitEnable()
type Host ¶
type Host struct { FreeChunkCnt int64 // host free chunk count MaxChunkCnt int64 // total chunk count // contains filtered or unexported fields }
Host host info
type IClusterTopology ¶
type IClusterTopology interface { GetIDCs() map[string]*IDC GetIDCDisks(idc string) (disks []*client.DiskInfoSimple) MaxFreeChunksDisk(idc string) *client.DiskInfoSimple IsBrokenDisk(diskID proto.DiskID) bool IVolumeCache closer.Closer }
IClusterTopology define the interface og cluster topology
func NewClusterTopologyMgr ¶
func NewClusterTopologyMgr(topologyClient client.ClusterMgrAPI, cfg *clusterTopologyConfig) IClusterTopology
NewClusterTopologyMgr returns cluster topology manager
type IDisKMigrator ¶
type IDisKMigrator interface { Migrator Progress(ctx context.Context) (migratingDisks []proto.DiskID, total, migrated int) DiskProgress(ctx context.Context, diskID proto.DiskID) (stats *api.DiskMigratingStats, err error) }
IDisKMigrator base interface of disk migrate, such as disk repair and disk drop
type IManualMigrator ¶
type IManualMigrator interface { Migrator AddManualTask(ctx context.Context, vuid proto.Vuid, forbiddenDirectDownload bool) (err error) }
IManualMigrator interface of manual migrator
type IMigrator ¶
type IMigrator interface { Migrator // inner interface SetLockFailHandleFunc(lockFailHandleFunc lockFailFunc) SetClearJunkTasksWhenLoadingFunc(clearJunkTasksWhenLoadingFunc clearJunkTasksFunc) GetMigratingDiskNum() int IsMigratingDisk(diskID proto.DiskID) bool ClearDeletedTasks(diskID proto.DiskID) ClearDeletedTaskByID(diskID proto.DiskID, taskID string) IsDeletedTask(task *proto.MigrateTask) bool DeletedTasks() []DeletedTask AddTask(ctx context.Context, task *proto.MigrateTask) GetTask(ctx context.Context, taskID string) (*proto.MigrateTask, error) ListAllTask(ctx context.Context) (tasks []*proto.MigrateTask, err error) ListAllTaskByDiskID(ctx context.Context, diskID proto.DiskID) (tasks []*proto.MigrateTask, err error) FinishTaskInAdvanceWhenLockFail(ctx context.Context, task *proto.MigrateTask) }
IMigrator interface of common migrator
type ITaskRunner ¶
type ITaskRunner interface { Enabled() bool RunTask() GetTaskStats() (success, failed [counter.SLOT]int) GetErrorStats() (errStats []string, totalErrCnt uint64) }
ITaskRunner define the interface of task running status.
type IVolumeCache ¶
type IVolumeCache interface { UpdateVolume(vid proto.Vid) (*client.VolumeInfoSimple, error) GetVolume(vid proto.Vid) (*client.VolumeInfoSimple, error) LoadVolumes() error }
IVolumeCache define the interface used for volume cache manager
type IVolumeInspector ¶
type IVolumeInspector interface { AcquireInspect(ctx context.Context) (*proto.VolumeInspectTask, error) CompleteInspect(ctx context.Context, ret *proto.VolumeInspectRet) GetTaskStats() (finished, timeout [counter.SLOT]int) Enabled() bool Run() closer.Closer }
IVolumeInspector define the interface of volume inspect manager
type KafkaConfig ¶
type KafkaConfig struct { BrokerList []string `json:"broker_list"` FailMsgSenderTimeoutMs int64 `json:"fail_msg_sender_timeout_ms"` ShardRepair ShardRepairKafkaConfig `json:"shard_repair"` BlobDelete BlobDeleteKafkaConfig `json:"blob_delete"` }
KafkaConfig kafka config
type MMigrator ¶
type MMigrator interface { IMigrator IDisKMigrator IManualMigrator }
MMigrator merged interfaces for mocking.
type ManualMigrateMgr ¶
type ManualMigrateMgr struct { IMigrator // contains filtered or unexported fields }
ManualMigrateMgr manual migrate manager
func NewManualMigrateMgr ¶
func NewManualMigrateMgr(clusterMgrCli client.ClusterMgrAPI, volumeUpdater client.IVolumeUpdater, taskLogger recordlog.Encoder, conf *MigrateConfig) *ManualMigrateMgr
NewManualMigrateMgr returns manual migrate manager
func (*ManualMigrateMgr) AddManualTask ¶
func (mgr *ManualMigrateMgr) AddManualTask(ctx context.Context, vuid proto.Vuid, forbiddenDirectDownload bool) (err error)
AddManualTask add manual migrate task
type MigrateConfig ¶
type MigrateConfig struct { ClusterID proto.ClusterID `json:"-"` // fill in config.go base.TaskCommonConfig }
MigrateConfig migrate config
type MigrateMgr ¶
MigrateMgr migrate manager
func NewMigrateMgr ¶
func NewMigrateMgr( clusterMgrCli client.ClusterMgrAPI, volumeUpdater client.IVolumeUpdater, taskSwitch taskswitch.ISwitcher, taskLogger recordlog.Encoder, conf *MigrateConfig, taskType proto.TaskType, ) *MigrateMgr
NewMigrateMgr returns migrate manager
func (*MigrateMgr) AcquireTask ¶
func (mgr *MigrateMgr) AcquireTask(ctx context.Context, idc string) (task proto.MigrateTask, err error)
AcquireTask acquire migrate task
func (*MigrateMgr) AddTask ¶
func (mgr *MigrateMgr) AddTask(ctx context.Context, task *proto.MigrateTask)
AddTask adds migrate task
func (*MigrateMgr) CancelTask ¶
func (mgr *MigrateMgr) CancelTask(ctx context.Context, args *api.OperateTaskArgs) (err error)
CancelTask cancel migrate task
func (*MigrateMgr) ClearDeletedTaskByID ¶
func (mgr *MigrateMgr) ClearDeletedTaskByID(diskID proto.DiskID, taskID string)
ClearDeletedTaskByID clear migrated task
func (*MigrateMgr) ClearDeletedTasks ¶
func (mgr *MigrateMgr) ClearDeletedTasks(diskID proto.DiskID)
ClearDeletedTasks clear tasks when disk is migrated
func (*MigrateMgr) CompleteTask ¶
func (mgr *MigrateMgr) CompleteTask(ctx context.Context, args *api.OperateTaskArgs) (err error)
CompleteTask complete migrate task
func (*MigrateMgr) DeletedTasks ¶
func (mgr *MigrateMgr) DeletedTasks() []DeletedTask
DeletedTasks returns deleted tasks
func (*MigrateMgr) FinishTaskInAdvanceWhenLockFail ¶
func (mgr *MigrateMgr) FinishTaskInAdvanceWhenLockFail(ctx context.Context, task *proto.MigrateTask)
FinishTaskInAdvanceWhenLockFail finish migrate task in advance when lock volume failed
func (*MigrateMgr) GetMigratingDiskNum ¶
func (mgr *MigrateMgr) GetMigratingDiskNum() int
GetMigratingDiskNum returns migrating disk count
func (*MigrateMgr) GetTask ¶
func (mgr *MigrateMgr) GetTask(ctx context.Context, taskID string) (*proto.MigrateTask, error)
GetTask returns task in db
func (*MigrateMgr) IsDeletedTask ¶
func (mgr *MigrateMgr) IsDeletedTask(task *proto.MigrateTask) bool
IsDeletedTask return true if the task is deleted
func (*MigrateMgr) IsMigratingDisk ¶
func (mgr *MigrateMgr) IsMigratingDisk(diskID proto.DiskID) bool
IsMigratingDisk returns true if disk is migrating
func (*MigrateMgr) ListAllTask ¶
func (mgr *MigrateMgr) ListAllTask(ctx context.Context) (tasks []*proto.MigrateTask, err error)
ListAllTask returns all migrate task
func (*MigrateMgr) ListAllTaskByDiskID ¶
func (mgr *MigrateMgr) ListAllTaskByDiskID(ctx context.Context, diskID proto.DiskID) (tasks []*proto.MigrateTask, err error)
ListAllTaskByDiskID return all task by diskID
func (*MigrateMgr) Load ¶
func (mgr *MigrateMgr) Load() (err error)
Load load migrate task from database
func (*MigrateMgr) QueryTask ¶
func (mgr *MigrateMgr) QueryTask(ctx context.Context, taskID string) (*api.MigrateTaskDetail, error)
QueryTask implement migrator
func (*MigrateMgr) ReclaimTask ¶
func (mgr *MigrateMgr) ReclaimTask(ctx context.Context, idc, taskID string, src []proto.VunitLocation, oldDst proto.VunitLocation, newDst *client.AllocVunitInfo) (err error)
ReclaimTask reclaim migrate task
func (*MigrateMgr) RenewalTask ¶
func (mgr *MigrateMgr) RenewalTask(ctx context.Context, idc, taskID string) (err error)
RenewalTask renewal migrate task
func (*MigrateMgr) ReportWorkerTaskStats ¶
func (mgr *MigrateMgr) ReportWorkerTaskStats(st *api.TaskReportArgs)
ReportWorkerTaskStats implement migrator
func (*MigrateMgr) Run ¶
func (mgr *MigrateMgr) Run()
Run run migrate task do prepare and finish task phase
func (*MigrateMgr) SetClearJunkTasksWhenLoadingFunc ¶
func (mgr *MigrateMgr) SetClearJunkTasksWhenLoadingFunc(clearJunkTasksWhenLoadingFunc clearJunkTasksFunc)
SetClearJunkTasksWhenLoadingFunc set clear junk task func
func (*MigrateMgr) SetLockFailHandleFunc ¶
func (mgr *MigrateMgr) SetLockFailHandleFunc(lockFailHandleFunc lockFailFunc)
SetLockFailHandleFunc set lock failed func
func (*MigrateMgr) StatQueueTaskCnt ¶
func (mgr *MigrateMgr) StatQueueTaskCnt() (inited, prepared, completed int)
StatQueueTaskCnt returns queue task count
func (*MigrateMgr) Stats ¶
func (mgr *MigrateMgr) Stats() api.MigrateTasksStat
Stats implement migrator
func (*MigrateMgr) WaitEnable ¶
func (mgr *MigrateMgr) WaitEnable()
WaitEnable block to wait enable.
type MigratingVuids ¶
MigratingVuids record migrating vuid info
type Migrator ¶
type Migrator interface { AcquireTask(ctx context.Context, idc string) (proto.MigrateTask, error) CancelTask(ctx context.Context, args *api.OperateTaskArgs) error CompleteTask(ctx context.Context, args *api.OperateTaskArgs) error ReclaimTask(ctx context.Context, idc, taskID string, src []proto.VunitLocation, oldDst proto.VunitLocation, newDst *client.AllocVunitInfo) error RenewalTask(ctx context.Context, idc, taskID string) error QueryTask(ctx context.Context, taskID string) (*api.MigrateTaskDetail, error) // status ReportWorkerTaskStats(st *api.TaskReportArgs) StatQueueTaskCnt() (inited, prepared, completed int) Stats() api.MigrateTasksStat // control taskswitch.ISwitcher closer.Closer Load() error Run() }
Migrator base interface of migrate, balancer, disk_droper, manual_migrater.
type OrphanShard ¶
type OrphanShard struct { ClusterID proto.ClusterID `json:"cluster_id"` Vid proto.Vid `json:"vid"` Bid proto.BlobID `json:"bid"` }
OrphanShard orphan shard identification.
type Service ¶
Service rpc service
func NewService ¶
NewService returns scheduler service
func (*Service) CloseKafkaMonitors ¶
func (svr *Service) CloseKafkaMonitors()
func (*Service) HTTPDiskMigratingStats ¶
HTTPDiskMigratingStats returns disk migrating stats
func (*Service) HTTPInspectAcquire ¶
HTTPInspectAcquire acquire inspect task
func (*Service) HTTPInspectComplete ¶
HTTPInspectComplete complete inspect task
func (*Service) HTTPManualMigrateTaskAdd ¶
HTTPManualMigrateTaskAdd adds manual migrate task
func (*Service) HTTPMigrateTaskDetail ¶
HTTPMigrateTaskDetail returns migrate task detail.
func (*Service) HTTPTaskAcquire ¶
HTTPTaskAcquire acquire task
func (*Service) HTTPTaskCancel ¶
HTTPTaskCancel cancel task
func (*Service) HTTPTaskComplete ¶
HTTPTaskComplete complete task
func (*Service) HTTPTaskReclaim ¶
HTTPTaskReclaim reclaim task
func (*Service) HTTPTaskRenewal ¶
HTTPTaskRenewal renewal task
func (*Service) HTTPTaskReport ¶
HTTPTaskReport reports task stats
func (*Service) HTTPUpdateVolume ¶
HTTPUpdateVolume updates volume cache
func (*Service) Handler ¶
func (svr *Service) Handler(w http.ResponseWriter, req *http.Request, f func(http.ResponseWriter, *http.Request))
func (*Service) NewKafkaMonitor ¶
func (*Service) RunKafkaMonitors ¶
func (svr *Service) RunKafkaMonitors()
type ServiceRegisterConfig ¶
type ServiceRegisterConfig struct { TickInterval uint32 `json:"tick_interval"` HeartbeatTicks uint32 `json:"heartbeat_ticks"` ExpiresTicks uint32 `json:"expires_ticks"` Idc string `json:"idc"` Host string `json:"host"` }
ServiceRegisterConfig is service register info
type ShardRepairConfig ¶
type ShardRepairConfig struct { ClusterID proto.ClusterID IDC string TaskPoolSize int `json:"task_pool_size"` NormalHandleBatchCnt int `json:"normal_handle_batch_cnt"` FailHandleBatchCnt int `json:"fail_handle_batch_cnt"` FailMsgConsumeIntervalMs int64 `json:"fail_msg_consume_interval_ms"` OrphanShardLog recordlog.Config `json:"orphan_shard_log"` Kafka ShardRepairKafkaConfig `json:"-"` }
ShardRepairConfig shard repair config
type ShardRepairKafkaConfig ¶
type ShardRepairKafkaConfig struct { BrokerList []string `json:"-"` FailMsgSenderTimeoutMs int64 `json:"-"` Normal TopicConfig `json:"normal"` Failed TopicConfig `json:"failed"` Priority TopicConfig `json:"priority"` }
ShardRepairKafkaConfig is kafka config of shard repair
type ShardRepairMgr ¶
type ShardRepairMgr struct {
// contains filtered or unexported fields
}
ShardRepairMgr shard repair manager
func NewShardRepairMgr ¶
func NewShardRepairMgr( cfg *ShardRepairConfig, clusterTopology IClusterTopology, switchMgr *taskswitch.SwitchMgr, blobnodeCli client.BlobnodeAPI, clusterMgrCli client.ClusterMgrAPI, ) (*ShardRepairMgr, error)
NewShardRepairMgr returns shard repair manager
func (*ShardRepairMgr) Enabled ¶
func (s *ShardRepairMgr) Enabled() bool
Enabled returns true if shard repair task is enabled, otherwise returns false
func (*ShardRepairMgr) GetErrorStats ¶
func (s *ShardRepairMgr) GetErrorStats() (errStats []string, totalErrCnt uint64)
GetErrorStats returns service error stats
func (*ShardRepairMgr) GetTaskStats ¶
GetTaskStats returns task stats
type TopicConfig ¶
TopicConfig topic config
type VolumeCache ¶
type VolumeCache struct {
// contains filtered or unexported fields
}
VolumeCache volume cache
func NewVolumeCache ¶
func NewVolumeCache(client client.ClusterMgrAPI, updateInterval time.Duration) *VolumeCache
NewVolumeCache returns volume cache manager.
func (*VolumeCache) GetVolume ¶
func (c *VolumeCache) GetVolume(vid proto.Vid) (*client.VolumeInfoSimple, error)
GetVolume returns this volume info.
func (*VolumeCache) LoadVolumes ¶
func (c *VolumeCache) LoadVolumes() error
Load list all volumes info memory cache.
func (*VolumeCache) UpdateVolume ¶
func (c *VolumeCache) UpdateVolume(vid proto.Vid) (*client.VolumeInfoSimple, error)
UpdateVolume this volume info cache.
type VolumeInspectMgr ¶
VolumeInspectMgr inspect task manager
func NewVolumeInspectMgr ¶
func NewVolumeInspectMgr( clusterMgrCli client.ClusterMgrAPI, repairShardSender client.ProxyAPI, taskSwitch taskswitch.ISwitcher, cfg *VolumeInspectMgrCfg) *VolumeInspectMgr
NewVolumeInspectMgr returns inspect task manager
func (*VolumeInspectMgr) AcquireInspect ¶
func (mgr *VolumeInspectMgr) AcquireInspect(ctx context.Context) (*proto.VolumeInspectTask, error)
AcquireInspect acquire inspect task
func (*VolumeInspectMgr) CompleteInspect ¶
func (mgr *VolumeInspectMgr) CompleteInspect(ctx context.Context, ret *proto.VolumeInspectRet)
CompleteInspect complete inspect task
func (*VolumeInspectMgr) Enabled ¶
func (mgr *VolumeInspectMgr) Enabled() bool
Enabled returns true if task switch status
func (*VolumeInspectMgr) GetTaskStats ¶
func (mgr *VolumeInspectMgr) GetTaskStats() (finished, timeout [counter.SLOT]int)
GetTaskStats return task stats
type VolumeInspectMgrCfg ¶
type VolumeInspectMgrCfg struct { InspectIntervalS int `json:"inspect_interval_s"` InspectBatch int `json:"inspect_batch"` // iops of list volume info ListVolStep int `json:"list_vol_step"` ListVolIntervalMs int `json:"list_vol_interval_ms"` // timeout of inspect TimeoutMs int `json:"timeout_ms"` }
VolumeInspectMgrCfg inspect task manager config