scheduler

package
v0.0.0-...-303e327 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2023 License: Apache-2.0 Imports: 37 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DelDone = deleteStatus(iota)
	DelDelay
	DelFailed
	DelUnexpect
)

blob delete status

View Source
const (
	ShardRepairDone = shardRepairStatus(iota)
	ShardRepairFailed
	ShardRepairUnexpect
	ShardRepairOrphan
)

shard repair status

View Source
const (
	ShardRepair = "shard_repair"
)

shard repair name

Variables

View Source
var (
	// ErrNoBalanceVunit no balance volume unit on disk
	ErrNoBalanceVunit = errors.New("no balance volume unit on disk")
	// ErrTooManyBalancingTasks too many balancing tasks
	ErrTooManyBalancingTasks = errors.New("too many balancing tasks")
)
View Source
var ErrBlobnodeServiceUnavailable = errors.New("blobnode service unavailable")

ErrBlobnodeServiceUnavailable worker service unavailable

View Source
var ErrFrequentlyUpdate = errors.New("frequently update")

ErrFrequentlyUpdate frequently update

View Source
var ErrVunitLengthNotEqual = errors.New("vunit length not equal")

ErrVunitLengthNotEqual vunit length not equal

Functions

func DeduplicateMsgs

func DeduplicateMsgs(ctx context.Context, delMsgs []*proto.DeleteMsg) (msgs []*proto.DeleteMsg)

DeduplicateMsgs deduplicate delete messages

func DoubleCheckedRun

func DoubleCheckedRun(ctx context.Context, c IClusterTopology, vid proto.Vid, task func(*client.VolumeInfoSimple) (*client.VolumeInfoSimple, error)) error

DoubleCheckedRun the scheduler updates volume mapping relation asynchronously, then some task(delete or repair) had started with old volume mapping.

if delete on old relation, there will has garbage shard in new chunk. ==> garbage shard if repair on old relation, there still is missing shard in new chunk. ==> missing shard

func NewHandler

func NewHandler(service *Service) *rpc.Router

NewHandler returns app server handler

Types

type BalanceMgr

type BalanceMgr struct {
	IMigrator
	// contains filtered or unexported fields
}

BalanceMgr balance manager

func NewBalanceMgr

func NewBalanceMgr(clusterMgrCli client.ClusterMgrAPI, volumeUpdater client.IVolumeUpdater, taskSwitch taskswitch.ISwitcher,
	clusterTopology IClusterTopology, taskLogger recordlog.Encoder, conf *BalanceMgrConfig) *BalanceMgr

NewBalanceMgr returns balance manager

func (*BalanceMgr) Close

func (mgr *BalanceMgr) Close()

Close close balance task manager

func (*BalanceMgr) Run

func (mgr *BalanceMgr) Run()

Run run balance task manager

type BalanceMgrConfig

type BalanceMgrConfig struct {
	MaxDiskFreeChunkCnt int64 `json:"max_disk_free_chunk_cnt"`
	MinDiskFreeChunkCnt int64 `json:"min_disk_free_chunk_cnt"`
	MigrateConfig
}

BalanceMgrConfig balance task manager config

type BlobDeleteConfig

type BlobDeleteConfig struct {
	ClusterID proto.ClusterID

	TaskPoolSize int `json:"task_pool_size"`

	FailMsgConsumeIntervalMs int64 `json:"fail_msg_consume_interval_ms"`

	NormalHandleBatchCnt int `json:"normal_handle_batch_cnt"`
	FailHandleBatchCnt   int `json:"fail_handle_batch_cnt"`

	SafeDelayTimeH  int64            `json:"safe_delay_time_h"`
	DeleteHourRange HourRange        `json:"delete_hour_range"`
	DeleteLog       recordlog.Config `json:"delete_log"`

	Kafka BlobDeleteKafkaConfig `json:"-"`
}

BlobDeleteConfig is blob delete config

type BlobDeleteKafkaConfig

type BlobDeleteKafkaConfig struct {
	BrokerList             []string    `json:"-"`
	FailMsgSenderTimeoutMs int64       `json:"-"`
	Normal                 TopicConfig `json:"normal"`
	Failed                 TopicConfig `json:"failed"`
}

BlobDeleteKafkaConfig is kafka config of blob delete

type BlobDeleteMgr

type BlobDeleteMgr struct {
	// contains filtered or unexported fields
}

BlobDeleteMgr is blob delete manager

func NewBlobDeleteMgr

func NewBlobDeleteMgr(
	cfg *BlobDeleteConfig,
	clusterTopology IClusterTopology,
	blobnodeCli client.BlobnodeAPI,
	switchMgr *taskswitch.SwitchMgr,
	clusterMgrCli client.ClusterMgrAPI,
) (*BlobDeleteMgr, error)

NewBlobDeleteMgr returns blob delete manager

func (*BlobDeleteMgr) Enabled

func (mgr *BlobDeleteMgr) Enabled() bool

Enabled returns return if delete task switch is enable, otherwise returns false

func (*BlobDeleteMgr) GetErrorStats

func (mgr *BlobDeleteMgr) GetErrorStats() (errStats []string, totalErrCnt uint64)

GetErrorStats returns error stats

func (*BlobDeleteMgr) GetTaskStats

func (mgr *BlobDeleteMgr) GetTaskStats() (success [counter.SLOT]int, failed [counter.SLOT]int)

GetTaskStats returns task stats

func (*BlobDeleteMgr) RunTask

func (mgr *BlobDeleteMgr) RunTask()

RunTask consumers delete messages

type ClusterTopology

type ClusterTopology struct {
	FreeChunkCnt int64
	MaxChunkCnt  int64
	// contains filtered or unexported fields
}

ClusterTopology cluster topology

type ClusterTopologyMgr

type ClusterTopologyMgr struct {
	closer.Closer
	// contains filtered or unexported fields
}

ClusterTopologyMgr cluster topology manager

func (*ClusterTopologyMgr) GetIDCDisks

func (m *ClusterTopologyMgr) GetIDCDisks(idc string) (disks []*client.DiskInfoSimple)

GetIDCDisks returns disks with IDC

func (*ClusterTopologyMgr) GetIDCs

func (m *ClusterTopologyMgr) GetIDCs() map[string]*IDC

GetIDCs returns IDCs

func (*ClusterTopologyMgr) GetVolume

func (m *ClusterTopologyMgr) GetVolume(vid proto.Vid) (*client.VolumeInfoSimple, error)

func (*ClusterTopologyMgr) IsBrokenDisk

func (m *ClusterTopologyMgr) IsBrokenDisk(diskID proto.DiskID) bool

func (*ClusterTopologyMgr) LoadVolumes

func (m *ClusterTopologyMgr) LoadVolumes() error

func (*ClusterTopologyMgr) MaxFreeChunksDisk

func (m *ClusterTopologyMgr) MaxFreeChunksDisk(idc string) *client.DiskInfoSimple

MaxFreeChunksDisk returns disk which has max free chunks

func (*ClusterTopologyMgr) ReportFreeChunkCnt

func (m *ClusterTopologyMgr) ReportFreeChunkCnt(disk *client.DiskInfoSimple)

ReportFreeChunkCnt report free chunk cnt

func (*ClusterTopologyMgr) UpdateVolume

func (m *ClusterTopologyMgr) UpdateVolume(vid proto.Vid) (*client.VolumeInfoSimple, error)

type Config

type Config struct {
	cmd.Config

	ClusterID proto.ClusterID `json:"cluster_id"`
	Services  Services        `json:"services"`

	TopologyUpdateIntervalMin  int       `json:"topology_update_interval_min"`
	VolumeCacheUpdateIntervalS int       `json:"volume_cache_update_interval_s"`
	FreeChunkCounterBuckets    []float64 `json:"free_chunk_counter_buckets"`

	ClusterMgr clustermgr.Config `json:"clustermgr"`
	Proxy      proxy.LbConfig    `json:"proxy"`
	Blobnode   blobnode.Config   `json:"blobnode"`
	Scheduler  scheduler.Config  `json:"scheduler"`

	Balance       BalanceMgrConfig    `json:"balance"`
	DiskDrop      MigrateConfig       `json:"disk_drop"`
	DiskRepair    MigrateConfig       `json:"disk_repair"`
	ManualMigrate MigrateConfig       `json:"manual_migrate"`
	VolumeInspect VolumeInspectMgrCfg `json:"volume_inspect"`
	TaskLog       recordlog.Config    `json:"task_log"`

	Kafka       KafkaConfig       `json:"kafka"`
	ShardRepair ShardRepairConfig `json:"shard_repair"`
	BlobDelete  BlobDeleteConfig  `json:"blob_delete"`

	ServiceRegister ServiceRegisterConfig `json:"service_register"`
}

Config service config

func (*Config) Follower

func (c *Config) Follower() []string

func (*Config) IsLeader

func (c *Config) IsLeader() bool

func (*Config) Leader

func (c *Config) Leader() string

type DelDoc

type DelDoc struct {
	ClusterID     proto.ClusterID `json:"cid"`
	Bid           proto.BlobID    `json:"bid"`
	Vid           proto.Vid       `json:"vid"`
	Retry         int             `json:"retry"`
	Time          int64           `json:"t"`
	ReqID         string          `json:"rid"`
	ActualDelTime int64           `json:"del_at"` // unix time in S
}

DelDoc is a delete doc information for logging in dellog

type DeletedTask

type DeletedTask struct {
	DiskID      proto.DiskID
	TaskID      string
	DeletedTime time.Time
}

type DiskDropMgr

type DiskDropMgr struct {
	IMigrator
	// contains filtered or unexported fields
}

DiskDropMgr disk drop manager

func NewDiskDropMgr

func NewDiskDropMgr(clusterMgrCli client.ClusterMgrAPI, volumeUpdater client.IVolumeUpdater,
	taskSwitch taskswitch.ISwitcher, taskLogger recordlog.Encoder, conf *MigrateConfig) *DiskDropMgr

NewDiskDropMgr returns disk drop manager

func (*DiskDropMgr) DiskProgress

func (mgr *DiskDropMgr) DiskProgress(ctx context.Context, diskID proto.DiskID) (stats *api.DiskMigratingStats, err error)

func (*DiskDropMgr) Load

func (mgr *DiskDropMgr) Load() (err error)

Load load disk drop task from database

func (*DiskDropMgr) Progress

func (mgr *DiskDropMgr) Progress(ctx context.Context) (migratingDisks []proto.DiskID, total, migrated int)

Progress returns disk drop progress

func (*DiskDropMgr) Run

func (mgr *DiskDropMgr) Run()

Run run disk drop task

type DiskRepairMgr

type DiskRepairMgr struct {
	closer.Closer
	// contains filtered or unexported fields
}

DiskRepairMgr repair task manager

func NewDiskRepairMgr

func NewDiskRepairMgr(clusterMgrCli client.ClusterMgrAPI, taskSwitch taskswitch.ISwitcher, taskLogger recordlog.Encoder, cfg *MigrateConfig) *DiskRepairMgr

NewDiskRepairMgr returns repair manager

func (*DiskRepairMgr) AcquireTask

func (mgr *DiskRepairMgr) AcquireTask(ctx context.Context, idc string) (task proto.MigrateTask, err error)

AcquireTask acquire repair task

func (*DiskRepairMgr) CancelTask

func (mgr *DiskRepairMgr) CancelTask(ctx context.Context, args *api.OperateTaskArgs) error

CancelTask cancel repair task

func (*DiskRepairMgr) CompleteTask

func (mgr *DiskRepairMgr) CompleteTask(ctx context.Context, args *api.OperateTaskArgs) error

CompleteTask complete repair task

func (*DiskRepairMgr) DiskProgress

func (mgr *DiskRepairMgr) DiskProgress(ctx context.Context, diskID proto.DiskID) (stats *api.DiskMigratingStats, err error)

func (*DiskRepairMgr) Enabled

func (mgr *DiskRepairMgr) Enabled() bool

func (*DiskRepairMgr) Load

func (mgr *DiskRepairMgr) Load() error

Load load repair task from database

func (*DiskRepairMgr) Progress

func (mgr *DiskRepairMgr) Progress(ctx context.Context) (migratingDisks []proto.DiskID, total, migrated int)

Progress repair manager progress

func (*DiskRepairMgr) QueryTask

func (mgr *DiskRepairMgr) QueryTask(ctx context.Context, taskID string) (*api.MigrateTaskDetail, error)

QueryTask return task statistics

func (*DiskRepairMgr) ReclaimTask

func (mgr *DiskRepairMgr) ReclaimTask(ctx context.Context,
	idc, taskID string,
	src []proto.VunitLocation,
	oldDst proto.VunitLocation,
	newDst *client.AllocVunitInfo) error

ReclaimTask reclaim repair task

func (*DiskRepairMgr) RenewalTask

func (mgr *DiskRepairMgr) RenewalTask(ctx context.Context, idc, taskID string) error

RenewalTask renewal repair task

func (*DiskRepairMgr) ReportWorkerTaskStats

func (mgr *DiskRepairMgr) ReportWorkerTaskStats(st *api.TaskReportArgs)

ReportWorkerTaskStats reports task stats

func (*DiskRepairMgr) Run

func (mgr *DiskRepairMgr) Run()

Run run repair task includes collect/prepare/finish/check phase

func (*DiskRepairMgr) StatQueueTaskCnt

func (mgr *DiskRepairMgr) StatQueueTaskCnt() (inited, prepared, completed int)

StatQueueTaskCnt returns task queue stats

func (*DiskRepairMgr) Stats

func (mgr *DiskRepairMgr) Stats() api.MigrateTasksStat

Stats returns task stats

func (*DiskRepairMgr) WaitEnable

func (mgr *DiskRepairMgr) WaitEnable()

type Host

type Host struct {
	FreeChunkCnt int64 // host free chunk count
	MaxChunkCnt  int64 // total chunk count
	// contains filtered or unexported fields
}

Host host info

type HourRange

type HourRange struct {
	From int `json:"from"`
	To   int `json:"to"`
}

func (HourRange) Valid

func (t HourRange) Valid() bool

type IClusterTopology

type IClusterTopology interface {
	GetIDCs() map[string]*IDC
	GetIDCDisks(idc string) (disks []*client.DiskInfoSimple)
	MaxFreeChunksDisk(idc string) *client.DiskInfoSimple
	IsBrokenDisk(diskID proto.DiskID) bool
	IVolumeCache
	closer.Closer
}

IClusterTopology define the interface og cluster topology

func NewClusterTopologyMgr

func NewClusterTopologyMgr(topologyClient client.ClusterMgrAPI, cfg *clusterTopologyConfig) IClusterTopology

NewClusterTopologyMgr returns cluster topology manager

type IDC

type IDC struct {
	FreeChunkCnt int64
	MaxChunkCnt  int64
	// contains filtered or unexported fields
}

IDC idc info

type IDisKMigrator

type IDisKMigrator interface {
	Migrator
	Progress(ctx context.Context) (migratingDisks []proto.DiskID, total, migrated int)
	DiskProgress(ctx context.Context, diskID proto.DiskID) (stats *api.DiskMigratingStats, err error)
}

IDisKMigrator base interface of disk migrate, such as disk repair and disk drop

type IManualMigrator

type IManualMigrator interface {
	Migrator
	AddManualTask(ctx context.Context, vuid proto.Vuid, forbiddenDirectDownload bool) (err error)
}

IManualMigrator interface of manual migrator

type IMigrator

type IMigrator interface {
	Migrator
	// inner interface
	SetLockFailHandleFunc(lockFailHandleFunc lockFailFunc)
	SetClearJunkTasksWhenLoadingFunc(clearJunkTasksWhenLoadingFunc clearJunkTasksFunc)
	GetMigratingDiskNum() int
	IsMigratingDisk(diskID proto.DiskID) bool
	ClearDeletedTasks(diskID proto.DiskID)
	ClearDeletedTaskByID(diskID proto.DiskID, taskID string)
	IsDeletedTask(task *proto.MigrateTask) bool
	DeletedTasks() []DeletedTask
	AddTask(ctx context.Context, task *proto.MigrateTask)
	GetTask(ctx context.Context, taskID string) (*proto.MigrateTask, error)
	ListAllTask(ctx context.Context) (tasks []*proto.MigrateTask, err error)
	ListAllTaskByDiskID(ctx context.Context, diskID proto.DiskID) (tasks []*proto.MigrateTask, err error)
	FinishTaskInAdvanceWhenLockFail(ctx context.Context, task *proto.MigrateTask)
}

IMigrator interface of common migrator

type ITaskRunner

type ITaskRunner interface {
	Enabled() bool
	RunTask()
	GetTaskStats() (success, failed [counter.SLOT]int)
	GetErrorStats() (errStats []string, totalErrCnt uint64)
}

ITaskRunner define the interface of task running status.

type IVolumeCache

type IVolumeCache interface {
	UpdateVolume(vid proto.Vid) (*client.VolumeInfoSimple, error)
	GetVolume(vid proto.Vid) (*client.VolumeInfoSimple, error)
	LoadVolumes() error
}

IVolumeCache define the interface used for volume cache manager

type IVolumeInspector

type IVolumeInspector interface {
	AcquireInspect(ctx context.Context) (*proto.VolumeInspectTask, error)
	CompleteInspect(ctx context.Context, ret *proto.VolumeInspectRet)
	GetTaskStats() (finished, timeout [counter.SLOT]int)
	Enabled() bool
	Run()
	closer.Closer
}

IVolumeInspector define the interface of volume inspect manager

type KafkaConfig

type KafkaConfig struct {
	BrokerList             []string               `json:"broker_list"`
	FailMsgSenderTimeoutMs int64                  `json:"fail_msg_sender_timeout_ms"`
	ShardRepair            ShardRepairKafkaConfig `json:"shard_repair"`
	BlobDelete             BlobDeleteKafkaConfig  `json:"blob_delete"`
}

KafkaConfig kafka config

type MMigrator

type MMigrator interface {
	IMigrator
	IDisKMigrator
	IManualMigrator
}

MMigrator merged interfaces for mocking.

type ManualMigrateMgr

type ManualMigrateMgr struct {
	IMigrator
	// contains filtered or unexported fields
}

ManualMigrateMgr manual migrate manager

func NewManualMigrateMgr

func NewManualMigrateMgr(clusterMgrCli client.ClusterMgrAPI, volumeUpdater client.IVolumeUpdater,
	taskLogger recordlog.Encoder, conf *MigrateConfig) *ManualMigrateMgr

NewManualMigrateMgr returns manual migrate manager

func (*ManualMigrateMgr) AddManualTask

func (mgr *ManualMigrateMgr) AddManualTask(ctx context.Context, vuid proto.Vuid, forbiddenDirectDownload bool) (err error)

AddManualTask add manual migrate task

type MigrateConfig

type MigrateConfig struct {
	ClusterID proto.ClusterID `json:"-"` // fill in config.go
	base.TaskCommonConfig
}

MigrateConfig migrate config

type MigrateMgr

type MigrateMgr struct {
	closer.Closer
	// contains filtered or unexported fields
}

MigrateMgr migrate manager

func NewMigrateMgr

func NewMigrateMgr(
	clusterMgrCli client.ClusterMgrAPI,
	volumeUpdater client.IVolumeUpdater,
	taskSwitch taskswitch.ISwitcher,
	taskLogger recordlog.Encoder,
	conf *MigrateConfig,
	taskType proto.TaskType,
) *MigrateMgr

NewMigrateMgr returns migrate manager

func (*MigrateMgr) AcquireTask

func (mgr *MigrateMgr) AcquireTask(ctx context.Context, idc string) (task proto.MigrateTask, err error)

AcquireTask acquire migrate task

func (*MigrateMgr) AddTask

func (mgr *MigrateMgr) AddTask(ctx context.Context, task *proto.MigrateTask)

AddTask adds migrate task

func (*MigrateMgr) CancelTask

func (mgr *MigrateMgr) CancelTask(ctx context.Context, args *api.OperateTaskArgs) (err error)

CancelTask cancel migrate task

func (*MigrateMgr) ClearDeletedTaskByID

func (mgr *MigrateMgr) ClearDeletedTaskByID(diskID proto.DiskID, taskID string)

ClearDeletedTaskByID clear migrated task

func (*MigrateMgr) ClearDeletedTasks

func (mgr *MigrateMgr) ClearDeletedTasks(diskID proto.DiskID)

ClearDeletedTasks clear tasks when disk is migrated

func (*MigrateMgr) CompleteTask

func (mgr *MigrateMgr) CompleteTask(ctx context.Context, args *api.OperateTaskArgs) (err error)

CompleteTask complete migrate task

func (*MigrateMgr) DeletedTasks

func (mgr *MigrateMgr) DeletedTasks() []DeletedTask

DeletedTasks returns deleted tasks

func (*MigrateMgr) Enabled

func (mgr *MigrateMgr) Enabled() bool

Enabled returns enable or not.

func (*MigrateMgr) FinishTaskInAdvanceWhenLockFail

func (mgr *MigrateMgr) FinishTaskInAdvanceWhenLockFail(ctx context.Context, task *proto.MigrateTask)

FinishTaskInAdvanceWhenLockFail finish migrate task in advance when lock volume failed

func (*MigrateMgr) GetMigratingDiskNum

func (mgr *MigrateMgr) GetMigratingDiskNum() int

GetMigratingDiskNum returns migrating disk count

func (*MigrateMgr) GetTask

func (mgr *MigrateMgr) GetTask(ctx context.Context, taskID string) (*proto.MigrateTask, error)

GetTask returns task in db

func (*MigrateMgr) IsDeletedTask

func (mgr *MigrateMgr) IsDeletedTask(task *proto.MigrateTask) bool

IsDeletedTask return true if the task is deleted

func (*MigrateMgr) IsMigratingDisk

func (mgr *MigrateMgr) IsMigratingDisk(diskID proto.DiskID) bool

IsMigratingDisk returns true if disk is migrating

func (*MigrateMgr) ListAllTask

func (mgr *MigrateMgr) ListAllTask(ctx context.Context) (tasks []*proto.MigrateTask, err error)

ListAllTask returns all migrate task

func (*MigrateMgr) ListAllTaskByDiskID

func (mgr *MigrateMgr) ListAllTaskByDiskID(ctx context.Context, diskID proto.DiskID) (tasks []*proto.MigrateTask, err error)

ListAllTaskByDiskID return all task by diskID

func (*MigrateMgr) Load

func (mgr *MigrateMgr) Load() (err error)

Load load migrate task from database

func (*MigrateMgr) QueryTask

func (mgr *MigrateMgr) QueryTask(ctx context.Context, taskID string) (*api.MigrateTaskDetail, error)

QueryTask implement migrator

func (*MigrateMgr) ReclaimTask

func (mgr *MigrateMgr) ReclaimTask(ctx context.Context, idc, taskID string,
	src []proto.VunitLocation, oldDst proto.VunitLocation, newDst *client.AllocVunitInfo) (err error)

ReclaimTask reclaim migrate task

func (*MigrateMgr) RenewalTask

func (mgr *MigrateMgr) RenewalTask(ctx context.Context, idc, taskID string) (err error)

RenewalTask renewal migrate task

func (*MigrateMgr) ReportWorkerTaskStats

func (mgr *MigrateMgr) ReportWorkerTaskStats(st *api.TaskReportArgs)

ReportWorkerTaskStats implement migrator

func (*MigrateMgr) Run

func (mgr *MigrateMgr) Run()

Run run migrate task do prepare and finish task phase

func (*MigrateMgr) SetClearJunkTasksWhenLoadingFunc

func (mgr *MigrateMgr) SetClearJunkTasksWhenLoadingFunc(clearJunkTasksWhenLoadingFunc clearJunkTasksFunc)

SetClearJunkTasksWhenLoadingFunc set clear junk task func

func (*MigrateMgr) SetLockFailHandleFunc

func (mgr *MigrateMgr) SetLockFailHandleFunc(lockFailHandleFunc lockFailFunc)

SetLockFailHandleFunc set lock failed func

func (*MigrateMgr) StatQueueTaskCnt

func (mgr *MigrateMgr) StatQueueTaskCnt() (inited, prepared, completed int)

StatQueueTaskCnt returns queue task count

func (*MigrateMgr) Stats

func (mgr *MigrateMgr) Stats() api.MigrateTasksStat

Stats implement migrator

func (*MigrateMgr) WaitEnable

func (mgr *MigrateMgr) WaitEnable()

WaitEnable block to wait enable.

type MigratingVuids

type MigratingVuids map[proto.Vuid]string

MigratingVuids record migrating vuid info

type Migrator

type Migrator interface {
	AcquireTask(ctx context.Context, idc string) (proto.MigrateTask, error)
	CancelTask(ctx context.Context, args *api.OperateTaskArgs) error
	CompleteTask(ctx context.Context, args *api.OperateTaskArgs) error
	ReclaimTask(ctx context.Context, idc, taskID string,
		src []proto.VunitLocation, oldDst proto.VunitLocation, newDst *client.AllocVunitInfo) error
	RenewalTask(ctx context.Context, idc, taskID string) error
	QueryTask(ctx context.Context, taskID string) (*api.MigrateTaskDetail, error)
	// status
	ReportWorkerTaskStats(st *api.TaskReportArgs)
	StatQueueTaskCnt() (inited, prepared, completed int)
	Stats() api.MigrateTasksStat
	// control
	taskswitch.ISwitcher
	closer.Closer
	Load() error
	Run()
}

Migrator base interface of migrate, balancer, disk_droper, manual_migrater.

type OrphanShard

type OrphanShard struct {
	ClusterID proto.ClusterID `json:"cluster_id"`
	Vid       proto.Vid       `json:"vid"`
	Bid       proto.BlobID    `json:"bid"`
}

OrphanShard orphan shard identification.

type Rack

type Rack struct {
	FreeChunkCnt int64
	MaxChunkCnt  int64
	// contains filtered or unexported fields
}

Rack rack info

type Service

type Service struct {
	ClusterID proto.ClusterID
	// contains filtered or unexported fields
}

Service rpc service

func NewService

func NewService(conf *Config) (svr *Service, err error)

NewService returns scheduler service

func (*Service) Close

func (svr *Service) Close()

Close close service safe

func (*Service) CloseKafkaMonitors

func (svr *Service) CloseKafkaMonitors()

func (*Service) HTTPDiskMigratingStats

func (svr *Service) HTTPDiskMigratingStats(c *rpc.Context)

HTTPDiskMigratingStats returns disk migrating stats

func (*Service) HTTPInspectAcquire

func (svr *Service) HTTPInspectAcquire(c *rpc.Context)

HTTPInspectAcquire acquire inspect task

func (*Service) HTTPInspectComplete

func (svr *Service) HTTPInspectComplete(c *rpc.Context)

HTTPInspectComplete complete inspect task

func (*Service) HTTPManualMigrateTaskAdd

func (svr *Service) HTTPManualMigrateTaskAdd(c *rpc.Context)

HTTPManualMigrateTaskAdd adds manual migrate task

func (*Service) HTTPMigrateTaskDetail

func (svr *Service) HTTPMigrateTaskDetail(c *rpc.Context)

HTTPMigrateTaskDetail returns migrate task detail.

func (*Service) HTTPStats

func (svr *Service) HTTPStats(c *rpc.Context)

HTTPStats returns service stats

func (*Service) HTTPTaskAcquire

func (svr *Service) HTTPTaskAcquire(c *rpc.Context)

HTTPTaskAcquire acquire task

func (*Service) HTTPTaskCancel

func (svr *Service) HTTPTaskCancel(c *rpc.Context)

HTTPTaskCancel cancel task

func (*Service) HTTPTaskComplete

func (svr *Service) HTTPTaskComplete(c *rpc.Context)

HTTPTaskComplete complete task

func (*Service) HTTPTaskReclaim

func (svr *Service) HTTPTaskReclaim(c *rpc.Context)

HTTPTaskReclaim reclaim task

func (*Service) HTTPTaskRenewal

func (svr *Service) HTTPTaskRenewal(c *rpc.Context)

HTTPTaskRenewal renewal task

func (*Service) HTTPTaskReport

func (svr *Service) HTTPTaskReport(c *rpc.Context)

HTTPTaskReport reports task stats

func (*Service) HTTPUpdateVolume

func (svr *Service) HTTPUpdateVolume(c *rpc.Context)

HTTPUpdateVolume updates volume cache

func (*Service) Handler

func (svr *Service) Handler(w http.ResponseWriter, req *http.Request, f func(http.ResponseWriter, *http.Request))

func (*Service) LoadVolInfo

func (svr *Service) LoadVolInfo() error

LoadVolInfo load volume info

func (*Service) NewKafkaMonitor

func (svr *Service) NewKafkaMonitor(clusterID proto.ClusterID, access base.IConsumerOffset) error

func (*Service) Run

func (svr *Service) Run()

Run run task

func (*Service) RunKafkaMonitors

func (svr *Service) RunKafkaMonitors()

func (*Service) RunTask

func (svr *Service) RunTask()

RunTask run shard repair and blob delete tasks

type ServiceRegisterConfig

type ServiceRegisterConfig struct {
	TickInterval   uint32 `json:"tick_interval"`
	HeartbeatTicks uint32 `json:"heartbeat_ticks"`
	ExpiresTicks   uint32 `json:"expires_ticks"`
	Idc            string `json:"idc"`
	Host           string `json:"host"`
}

ServiceRegisterConfig is service register info

type Services

type Services struct {
	Leader  uint64            `json:"leader"`
	NodeID  uint64            `json:"node_id"`
	Members map[uint64]string `json:"members"`
}

type ShardRepairConfig

type ShardRepairConfig struct {
	ClusterID proto.ClusterID
	IDC       string

	TaskPoolSize         int `json:"task_pool_size"`
	NormalHandleBatchCnt int `json:"normal_handle_batch_cnt"`

	FailHandleBatchCnt       int   `json:"fail_handle_batch_cnt"`
	FailMsgConsumeIntervalMs int64 `json:"fail_msg_consume_interval_ms"`

	OrphanShardLog recordlog.Config `json:"orphan_shard_log"`

	Kafka ShardRepairKafkaConfig `json:"-"`
}

ShardRepairConfig shard repair config

type ShardRepairKafkaConfig

type ShardRepairKafkaConfig struct {
	BrokerList             []string    `json:"-"`
	FailMsgSenderTimeoutMs int64       `json:"-"`
	Normal                 TopicConfig `json:"normal"`
	Failed                 TopicConfig `json:"failed"`
	Priority               TopicConfig `json:"priority"`
}

ShardRepairKafkaConfig is kafka config of shard repair

type ShardRepairMgr

type ShardRepairMgr struct {
	// contains filtered or unexported fields
}

ShardRepairMgr shard repair manager

func NewShardRepairMgr

func NewShardRepairMgr(
	cfg *ShardRepairConfig,
	clusterTopology IClusterTopology,
	switchMgr *taskswitch.SwitchMgr,
	blobnodeCli client.BlobnodeAPI,
	clusterMgrCli client.ClusterMgrAPI,
) (*ShardRepairMgr, error)

NewShardRepairMgr returns shard repair manager

func (*ShardRepairMgr) Enabled

func (s *ShardRepairMgr) Enabled() bool

Enabled returns true if shard repair task is enabled, otherwise returns false

func (*ShardRepairMgr) GetErrorStats

func (s *ShardRepairMgr) GetErrorStats() (errStats []string, totalErrCnt uint64)

GetErrorStats returns service error stats

func (*ShardRepairMgr) GetTaskStats

func (s *ShardRepairMgr) GetTaskStats() (success [counter.SLOT]int, failed [counter.SLOT]int)

GetTaskStats returns task stats

func (*ShardRepairMgr) RunTask

func (s *ShardRepairMgr) RunTask()

RunTask run shard repair task

type TopicConfig

type TopicConfig struct {
	Topic      string  `json:"topic"`
	Partitions []int32 `json:"partitions"`
}

TopicConfig topic config

type VolumeCache

type VolumeCache struct {
	// contains filtered or unexported fields
}

VolumeCache volume cache

func NewVolumeCache

func NewVolumeCache(client client.ClusterMgrAPI, updateInterval time.Duration) *VolumeCache

NewVolumeCache returns volume cache manager.

func (*VolumeCache) GetVolume

func (c *VolumeCache) GetVolume(vid proto.Vid) (*client.VolumeInfoSimple, error)

GetVolume returns this volume info.

func (*VolumeCache) LoadVolumes

func (c *VolumeCache) LoadVolumes() error

Load list all volumes info memory cache.

func (*VolumeCache) UpdateVolume

func (c *VolumeCache) UpdateVolume(vid proto.Vid) (*client.VolumeInfoSimple, error)

UpdateVolume this volume info cache.

type VolumeInspectMgr

type VolumeInspectMgr struct {
	closer.Closer
	// contains filtered or unexported fields
}

VolumeInspectMgr inspect task manager

func NewVolumeInspectMgr

func NewVolumeInspectMgr(
	clusterMgrCli client.ClusterMgrAPI,
	repairShardSender client.ProxyAPI,
	taskSwitch taskswitch.ISwitcher, cfg *VolumeInspectMgrCfg) *VolumeInspectMgr

NewVolumeInspectMgr returns inspect task manager

func (*VolumeInspectMgr) AcquireInspect

func (mgr *VolumeInspectMgr) AcquireInspect(ctx context.Context) (*proto.VolumeInspectTask, error)

AcquireInspect acquire inspect task

func (*VolumeInspectMgr) CompleteInspect

func (mgr *VolumeInspectMgr) CompleteInspect(ctx context.Context, ret *proto.VolumeInspectRet)

CompleteInspect complete inspect task

func (*VolumeInspectMgr) Enabled

func (mgr *VolumeInspectMgr) Enabled() bool

Enabled returns true if task switch status

func (*VolumeInspectMgr) GetTaskStats

func (mgr *VolumeInspectMgr) GetTaskStats() (finished, timeout [counter.SLOT]int)

GetTaskStats return task stats

func (*VolumeInspectMgr) Run

func (mgr *VolumeInspectMgr) Run()

Run run inspect task manager

type VolumeInspectMgrCfg

type VolumeInspectMgrCfg struct {
	InspectIntervalS int `json:"inspect_interval_s"`
	InspectBatch     int `json:"inspect_batch"`

	// iops of list volume info
	ListVolStep       int `json:"list_vol_step"`
	ListVolIntervalMs int `json:"list_vol_interval_ms"`

	// timeout of inspect
	TimeoutMs int `json:"timeout_ms"`
}

VolumeInspectMgrCfg inspect task manager config

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL