schedule

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2022 License: Apache-2.0 Imports: 36 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DispatchFromHeartBeat     = "heartbeat"
	DispatchFromNotifierQueue = "active push"
	DispatchFromCreate        = "create"
)

The source of dispatched resource.

View Source
const DefaultCacheSize = 1000

DefaultCacheSize is the default length of waiting list.

Variables

View Source
var (

	// PushOperatorTickInterval is the interval try to push the operator.
	PushOperatorTickInterval = 500 * time.Millisecond
	// ContainerBalanceBaseTime represents the base time of balance rate.
	ContainerBalanceBaseTime float64 = 60
)
View Source
var PriorityWeight = []float64{1.0, 4.0, 9.0}

PriorityWeight is used to represent the weight of different priorities of operators.

Functions

func ApplyOperator

func ApplyOperator(mc *mockcluster.Cluster, op *operator.Operator)

ApplyOperator applies operator. Only for test purpose.

func ApplyOperatorStep

func ApplyOperatorStep(resource *core.CachedResource, op *operator.Operator) *core.CachedResource

ApplyOperatorStep applies operator step. Only for test purpose.

func DecodeConfig

func DecodeConfig(data []byte, v interface{}) error

DecodeConfig decode the custom config for each scheduler.

func EncodeConfig

func EncodeConfig(v interface{}) ([]byte, error)

EncodeConfig encode the custom config for each scheduler.

func FindSchedulerTypeByName

func FindSchedulerTypeByName(name string) string

FindSchedulerTypeByName finds the type of the specified name.

func NewTotalOpInfluence

func NewTotalOpInfluence(operators []*operator.Operator, cluster opt.Cluster) operator.OpInfluence

NewTotalOpInfluence creates a OpInfluence.

func RegisterScheduler

func RegisterScheduler(typ string, createFn CreateSchedulerFunc)

RegisterScheduler binds a scheduler creator. It should be called in init() func of a package.

func RegisterSliceDecoderBuilder

func RegisterSliceDecoderBuilder(typ string, builder ConfigSliceDecoderBuilder)

RegisterSliceDecoderBuilder convert arguments to config. It should be called in init() func of package.

Types

type Bucket

type Bucket struct {
	// contains filtered or unexported fields
}

Bucket is used to maintain the operators created by a specific scheduler.

type CheckerController

type CheckerController struct {
	// contains filtered or unexported fields
}

CheckerController is used to manage all checkers.

func NewCheckerController

func NewCheckerController(ctx context.Context, cluster opt.Cluster, ruleManager *placement.RuleManager, opController *OperatorController) *CheckerController

NewCheckerController create a new CheckerController. TODO: isSupportMerge should be removed.

func (*CheckerController) AddWaitingResource

func (c *CheckerController) AddWaitingResource(res *core.CachedResource)

AddWaitingResource returns the resources in the waiting list.

func (*CheckerController) CheckResource

func (c *CheckerController) CheckResource(res *core.CachedResource) []*operator.Operator

CheckResource will check the resource and add a new operator if needed.

func (*CheckerController) FillReplicas

func (c *CheckerController) FillReplicas(res *core.CachedResource, leastPeers int) error

FillReplicas fill replicas for a empty resources

func (*CheckerController) GetMergeChecker

func (c *CheckerController) GetMergeChecker() *checker.MergeChecker

GetMergeChecker returns the merge checker.

func (*CheckerController) GetWaitingResources

func (c *CheckerController) GetWaitingResources() []*cache.Item

GetWaitingResources returns the resources in the waiting list.

func (*CheckerController) RemoveWaitingResource

func (c *CheckerController) RemoveWaitingResource(id uint64)

RemoveWaitingResource removes the resource from the waiting list.

type ConfigDecoder

type ConfigDecoder func(v interface{}) error

ConfigDecoder used to decode the config.

func ConfigJSONDecoder

func ConfigJSONDecoder(data []byte) ConfigDecoder

ConfigJSONDecoder used to build a json decoder of the config.

func ConfigSliceDecoder

func ConfigSliceDecoder(name string, args []string) ConfigDecoder

ConfigSliceDecoder the default decode for the config.

type ConfigSliceDecoderBuilder

type ConfigSliceDecoderBuilder func([]string) ConfigDecoder

ConfigSliceDecoderBuilder used to build slice decoder of the config.

type CreateSchedulerFunc

type CreateSchedulerFunc func(opController *OperatorController, storage storage.Storage, dec ConfigDecoder) (Scheduler, error)

CreateSchedulerFunc is for creating scheduler.

type OperatorController

type OperatorController struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

OperatorController is used to limit the speed of scheduling.

func NewOperatorController

func NewOperatorController(ctx context.Context, cluster opt.Cluster, hbStreams *hbstream.HeartbeatStreams) *OperatorController

NewOperatorController creates a OperatorController.

func (*OperatorController) AddOperator

func (oc *OperatorController) AddOperator(ops ...*operator.Operator) bool

AddOperator adds operators to the running operators.

func (*OperatorController) AddWaitingOperator

func (oc *OperatorController) AddWaitingOperator(ops ...*operator.Operator) int

AddWaitingOperator adds operators to waiting operators.

func (*OperatorController) CollectContainerLimitMetrics

func (oc *OperatorController) CollectContainerLimitMetrics()

CollectContainerLimitMetrics collects the metrics about container limit

func (*OperatorController) Ctx

Ctx returns a context which will be canceled once RaftCluster is stopped. For now, it is only used to control the lifetime of TTL cache in schedulers.

func (*OperatorController) Dispatch

func (oc *OperatorController) Dispatch(res *core.CachedResource, source string)

Dispatch is used to dispatch the operator of a resource.

func (*OperatorController) DispatchDestroyDirectly added in v0.2.0

func (oc *OperatorController) DispatchDestroyDirectly(res *core.CachedResource, source string)

DispatchDestroyDirectly send DestroyDirect cmd to the current container, because the resource has been removed.

func (*OperatorController) ExceedContainerLimit

func (oc *OperatorController) ExceedContainerLimit(ops ...*operator.Operator) bool

ExceedContainerLimit returns true if the container exceeds the cost limit after adding the operator. Otherwise, returns false.

func (*OperatorController) GetCluster

func (oc *OperatorController) GetCluster() opt.Cluster

GetCluster exports cluster to evict-scheduler for check container status.

func (*OperatorController) GetHistory

func (oc *OperatorController) GetHistory(start time.Time) []operator.OpHistory

GetHistory gets operators' history.

func (*OperatorController) GetLeaderSchedulePolicy

func (oc *OperatorController) GetLeaderSchedulePolicy() core.SchedulePolicy

GetLeaderSchedulePolicy is to get leader schedule policy.

func (*OperatorController) GetOpInfluence

func (oc *OperatorController) GetOpInfluence(cluster opt.Cluster) operator.OpInfluence

GetOpInfluence gets OpInfluence.

func (*OperatorController) GetOperator

func (oc *OperatorController) GetOperator(resID uint64) *operator.Operator

GetOperator gets a operator from the given resource.

func (*OperatorController) GetOperatorStatus

func (oc *OperatorController) GetOperatorStatus(id uint64) *OperatorWithStatus

GetOperatorStatus gets the operator and its status with the specify id.

func (*OperatorController) GetOperators

func (oc *OperatorController) GetOperators() []*operator.Operator

GetOperators gets operators from the running operators.

func (*OperatorController) GetWaitingOperators

func (oc *OperatorController) GetWaitingOperators() []*operator.Operator

GetWaitingOperators gets operators from the waiting operators.

func (*OperatorController) OperatorCount

func (oc *OperatorController) OperatorCount(mask operator.OpKind) uint64

OperatorCount gets the count of operators filtered by mask.

func (*OperatorController) PromoteWaitingOperator

func (oc *OperatorController) PromoteWaitingOperator()

PromoteWaitingOperator promotes operators from waiting operators.

func (*OperatorController) PruneHistory

func (oc *OperatorController) PruneHistory()

PruneHistory prunes a part of operators' history.

func (*OperatorController) PushOperators

func (oc *OperatorController) PushOperators()

PushOperators periodically pushes the unfinished operator to the executor(your storage application).

func (*OperatorController) RemoveOperator

func (oc *OperatorController) RemoveOperator(op *operator.Operator, extra string) bool

RemoveOperator removes a operator from the running operators.

func (*OperatorController) SendScheduleCommand

func (oc *OperatorController) SendScheduleCommand(res *core.CachedResource, step operator.OpStep, source string)

SendScheduleCommand sends a command to the resource.

func (*OperatorController) SetOperator

func (oc *OperatorController) SetOperator(op *operator.Operator)

SetOperator is only used for test.

type OperatorRecords

type OperatorRecords struct {
	// contains filtered or unexported fields
}

OperatorRecords remains the operator and its status for a while.

func NewOperatorRecords

func NewOperatorRecords(ctx context.Context) *OperatorRecords

NewOperatorRecords returns a OperatorRecords.

func (*OperatorRecords) Get

Get gets the operator and its status.

func (*OperatorRecords) Put

func (o *OperatorRecords) Put(op *operator.Operator)

Put puts the operator and its status.

type OperatorWithStatus

type OperatorWithStatus struct {
	Op     *operator.Operator
	Status metapb.OperatorStatus
}

OperatorWithStatus records the operator and its status.

func NewOperatorWithStatus

func NewOperatorWithStatus(op *operator.Operator) *OperatorWithStatus

NewOperatorWithStatus creates an OperatorStatus from an operator.

func (*OperatorWithStatus) MarshalJSON

func (o *OperatorWithStatus) MarshalJSON() ([]byte, error)

MarshalJSON returns the status of operator as a JSON string

type PluginInterface

type PluginInterface struct {
	// contains filtered or unexported fields
}

PluginInterface is used to manage all plugin.

func NewPluginInterface

func NewPluginInterface(logger *zap.Logger) *PluginInterface

NewPluginInterface create a plugin interface

func (*PluginInterface) GetFunction

func (p *PluginInterface) GetFunction(path string, funcName string) (plugin.Symbol, error)

GetFunction gets func by funcName from plugin(.so)

type RandBuckets

type RandBuckets struct {
	// contains filtered or unexported fields
}

RandBuckets is an implementation of waiting operators

func NewRandBuckets

func NewRandBuckets() *RandBuckets

NewRandBuckets creates a random buckets.

func (*RandBuckets) GetOperator

func (b *RandBuckets) GetOperator() []*operator.Operator

GetOperator gets an operator from the random buckets.

func (*RandBuckets) ListOperator

func (b *RandBuckets) ListOperator() []*operator.Operator

ListOperator lists all operator in the random buckets.

func (*RandBuckets) PutOperator

func (b *RandBuckets) PutOperator(op *operator.Operator)

PutOperator puts an operator into the random buckets.

type RangeCluster

type RangeCluster struct {
	opt.Cluster
	// contains filtered or unexported fields
}

RangeCluster isolates the cluster by range.

func GenRangeCluster

func GenRangeCluster(group uint64, cluster opt.Cluster, startKey, endKey []byte) *RangeCluster

GenRangeCluster gets a range cluster by specifying start key and end key. The cluster can only know the resources within [startKey, endKey].

func (*RangeCluster) GetAverageResourceSize

func (r *RangeCluster) GetAverageResourceSize() int64

GetAverageResourceSize returns the average resource approximate size.

func (*RangeCluster) GetContainer

func (r *RangeCluster) GetContainer(id uint64) *core.CachedContainer

GetContainer searches for a container by ID.

func (*RangeCluster) GetContainers

func (r *RangeCluster) GetContainers() []*core.CachedContainer

GetContainers returns all Containers in the cluster.

func (*RangeCluster) GetFollowerContainers

func (r *RangeCluster) GetFollowerContainers(res *core.CachedResource) []*core.CachedContainer

GetFollowerContainers returns all containers that contains the resource's follower peer.

func (*RangeCluster) GetLeaderContainer

func (r *RangeCluster) GetLeaderContainer(res *core.CachedResource) *core.CachedContainer

GetLeaderContainer returns all containers that contains the resource's leader peer.

func (*RangeCluster) GetResourceContainers

func (r *RangeCluster) GetResourceContainers(res *core.CachedResource) []*core.CachedContainer

GetResourceContainers returns all containers that contains the resource's peer.

func (*RangeCluster) GetTolerantSizeRatio

func (r *RangeCluster) GetTolerantSizeRatio() float64

GetTolerantSizeRatio gets the tolerant size ratio.

func (*RangeCluster) RandFollowerResource

func (r *RangeCluster) RandFollowerResource(groupKey string, containerID uint64, ranges []core.KeyRange, opts ...core.ResourceOption) *core.CachedResource

RandFollowerResource returns a random resource that has a follower on the Container.

func (*RangeCluster) RandLeaderResource

func (r *RangeCluster) RandLeaderResource(groupKey string, containerID uint64, ranges []core.KeyRange, opts ...core.ResourceOption) *core.CachedResource

RandLeaderResource returns a random resource that has leader on the container.

func (*RangeCluster) SetTolerantSizeRatio

func (r *RangeCluster) SetTolerantSizeRatio(ratio float64)

SetTolerantSizeRatio sets the tolerant size ratio.

type ResourceScatterer

type ResourceScatterer struct {
	// contains filtered or unexported fields
}

ResourceScatterer scatters resources.

func NewResourceScatterer

func NewResourceScatterer(ctx context.Context, cluster opt.Cluster) *ResourceScatterer

NewResourceScatterer creates a resource scatterer. ResourceScatter is used for the `Lightning`, it will scatter the specified resources before import data.

func (*ResourceScatterer) Put

func (r *ResourceScatterer) Put(peers map[uint64]metapb.Replica, leaderContainerID uint64, group string)

Put put the final distribution in the context no matter the operator was created

func (*ResourceScatterer) Scatter

func (r *ResourceScatterer) Scatter(res *core.CachedResource, group string) (*operator.Operator, error)

Scatter relocates the resource. If the group is defined, the resources' leader with the same group would be scattered in a group level instead of cluster level.

func (*ResourceScatterer) ScatterResources

func (r *ResourceScatterer) ScatterResources(resources map[uint64]*core.CachedResource, failures map[uint64]error, group string, retryLimit int) ([]*operator.Operator, error)

ScatterResources relocates the resources. If the group is defined, the resources' leader with the same group would be scattered in a group level instead of cluster level. RetryTimes indicates the retry times if any of the resources failed to relocate during scattering. There will be time.Sleep between each retry. Failures indicates the resources which are failed to be relocated, the key of the failures indicates the resID and the value of the failures indicates the failure error.

func (*ResourceScatterer) ScatterResourcesByID

func (r *ResourceScatterer) ScatterResourcesByID(resourceIDs []uint64, group string, retryLimit int) ([]*operator.Operator, map[uint64]error, error)

ScatterResourcesByID directly scatter resources by ScatterResources

func (*ResourceScatterer) ScatterResourcesByRange

func (r *ResourceScatterer) ScatterResourcesByRange(resGroup uint64, startKey, endKey []byte, group string, retryLimit int) ([]*operator.Operator, map[uint64]error, error)

ScatterResourcesByRange directly scatter resources by ScatterResources

type ResourceSplitter

type ResourceSplitter struct {
	// contains filtered or unexported fields
}

ResourceSplitter handles split resources

func NewResourceSplitter

func NewResourceSplitter(cluster opt.Cluster, handler SplitResourcesHandler) *ResourceSplitter

NewResourceSplitter return a resource splitter

func (*ResourceSplitter) SplitResources

func (r *ResourceSplitter) SplitResources(ctx context.Context, group uint64, splitKeys [][]byte, retryLimit int) (int, []uint64)

SplitResources support splitResources by given split keys.

type Scheduler

type Scheduler interface {
	http.Handler
	GetName() string
	// GetType should in accordance with the name passing to schedule.RegisterScheduler()
	GetType() string
	EncodeConfig() ([]byte, error)
	GetMinInterval() time.Duration
	GetNextInterval(interval time.Duration) time.Duration
	Prepare(cluster opt.Cluster) error
	Cleanup(cluster opt.Cluster)
	Schedule(cluster opt.Cluster) []*operator.Operator
	IsScheduleAllowed(cluster opt.Cluster) bool
}

Scheduler is an interface to schedule resources.

func CreateScheduler

func CreateScheduler(typ string, opController *OperatorController, storage storage.Storage, dec ConfigDecoder) (Scheduler, error)

CreateScheduler creates a scheduler with registered creator func.

type SplitResourcesHandler

type SplitResourcesHandler interface {
	SplitResourceByKeys(res *core.CachedResource, splitKeys [][]byte) error
	ScanResourcesByKeyRange(group uint64, groupKeys *resourceGroupKeys, results *splitKeyResults)
}

SplitResourcesHandler used to handle resource splitting

func NewSplitResourcesHandler

func NewSplitResourcesHandler(cluster opt.Cluster, oc *OperatorController) SplitResourcesHandler

NewSplitResourcesHandler return SplitResourcesHandler

type WaitingOperator

type WaitingOperator interface {
	PutOperator(op *operator.Operator)
	GetOperator() []*operator.Operator
	ListOperator() []*operator.Operator
}

WaitingOperator is an interface of waiting operators.

type WaitingOperatorStatus

type WaitingOperatorStatus struct {
	// contains filtered or unexported fields
}

WaitingOperatorStatus is used to limit the count of each kind of operators.

func NewWaitingOperatorStatus

func NewWaitingOperatorStatus() *WaitingOperatorStatus

NewWaitingOperatorStatus creates a new WaitingOperatorStatus.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL