scheduler

package
v0.0.0-...-67dc25f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 3, 2020 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	FairSortPolicy        = 0
	FifoSortPolicy        = 1
	MaxAvailableResources = 2 // node sorting, descending on available resources
	MinAvailableResources = 3 // node sorting, ascending on available resources
)

Variables

This section is empty.

Functions

func GetApplicationIDFromTags

func GetApplicationIDFromTags(tags map[string]string) string

func GetPartitionFromTags

func GetPartitionFromTags(tags map[string]string) string

func SortApplications

func SortApplications(apps []*SchedulingApplication, sortType SortType, globalResource *resources.Resource)

func SortNodes

func SortNodes(nodes []*SchedulingNode, sortType SortType)

func SortQueue

func SortQueue(queues []*SchedulingQueue, sortType SortType)

Types

type ClusterSchedulingContext

type ClusterSchedulingContext struct {
	// contains filtered or unexported fields
}

func NewClusterSchedulingContext

func NewClusterSchedulingContext() *ClusterSchedulingContext

func (*ClusterSchedulingContext) AddSchedulingApplication

func (csc *ClusterSchedulingContext) AddSchedulingApplication(schedulingApp *SchedulingApplication) error

func (*ClusterSchedulingContext) GetSchedulingApplication

func (csc *ClusterSchedulingContext) GetSchedulingApplication(appID string, partitionName string) *SchedulingApplication

func (*ClusterSchedulingContext) GetSchedulingNode

func (csc *ClusterSchedulingContext) GetSchedulingNode(nodeID, partitionName string) *SchedulingNode

Get a scheduling node based name and partition. Returns nil if the partition or node cannot be found.

func (*ClusterSchedulingContext) GetSchedulingQueue

func (csc *ClusterSchedulingContext) GetSchedulingQueue(queueName string, partitionName string) *SchedulingQueue

Visible by tests

func (*ClusterSchedulingContext) NeedPreemption

func (csc *ClusterSchedulingContext) NeedPreemption() bool

func (*ClusterSchedulingContext) RemoveSchedulingApplication

func (csc *ClusterSchedulingContext) RemoveSchedulingApplication(appID string, partitionName string) (*SchedulingApplication, error)

func (*ClusterSchedulingContext) RemoveSchedulingPartitionsByRMId

func (csc *ClusterSchedulingContext) RemoveSchedulingPartitionsByRMId(rmID string)

type DRFPreemptionPolicy

type DRFPreemptionPolicy struct {
}

Preemption policy based-on DRF

func (*DRFPreemptionPolicy) DoPreemption

func (m *DRFPreemptionPolicy) DoPreemption(scheduler *Scheduler)

type DefaultNodeIterator

type DefaultNodeIterator struct {
	// contains filtered or unexported fields
}

Default iterator, wraps the base iterator. Iterates over the list from the start, position zero, to end.

func NewDefaultNodeIterator

func NewDefaultNodeIterator(schedulerNodes []*SchedulingNode) *DefaultNodeIterator

Create a new default iterator

func (*DefaultNodeIterator) HasNext

func (bi *DefaultNodeIterator) HasNext() bool

HasNext returns true if there is a next element in the array. Returns false if there are no more elements or list is empty.

func (*DefaultNodeIterator) Next

func (bi *DefaultNodeIterator) Next() *SchedulingNode

Next returns the next element and advances to next element in array. Returns nil at the end of iteration.

func (*DefaultNodeIterator) Reset

func (bi *DefaultNodeIterator) Reset()

Reset the iterator to start from the beginning

type NodeIterator

type NodeIterator interface {
	// returns true if there are more values to iterate over
	HasNext() (ok bool)
	// returns the next node from the iterator
	Next() (node *SchedulingNode)
	// reset the iterator to a clean state
	Reset()
}

NodeIterator iterates over a list of nodes based on the defined policy

type PartitionManager

type PartitionManager struct {
	// contains filtered or unexported fields
}

func (PartitionManager) Run

func (manager PartitionManager) Run()

Run the manager for the partition. The manager has two tasks: - clean up the managed queues that are empty and removed from the configuration - remove empty unmanaged queues When the manager exits the partition is removed from the system and must be cleaned up

func (PartitionManager) Stop

func (manager PartitionManager) Stop()

Set the flag that the will allow the manager to exit. No locking needed as there is just one place where this is called which is already locked.

type PartitionSchedulingContext

type PartitionSchedulingContext struct {
	Root *SchedulingQueue // start of the scheduling queue hierarchy
	RmID string           // the RM the partition belongs to
	Name string           // name of the partition (logging mainly)
	// contains filtered or unexported fields
}

func (*PartitionSchedulingContext) AddSchedulingApplication

func (psc *PartitionSchedulingContext) AddSchedulingApplication(schedulingApp *SchedulingApplication) error

Add a new application to the scheduling partition.

func (*PartitionSchedulingContext) GetQueue

func (psc *PartitionSchedulingContext) GetQueue(name string) *SchedulingQueue

Get the queue from the structure based on the fully qualified name. Wrapper around the unlocked version getQueue() Visible by tests

func (*PartitionSchedulingContext) RemoveSchedulingApplication

func (psc *PartitionSchedulingContext) RemoveSchedulingApplication(appID string) (*SchedulingApplication, error)

Remove the application from the scheduling partition.

type PreemptionPolicy

type PreemptionPolicy interface {
	DoPreemption(scheduler *Scheduler)
}

type RoundRobinNodeIterator

type RoundRobinNodeIterator struct {
	// contains filtered or unexported fields
}

Random iterator, wraps the base iterator Iterates over the list from a random starting position in the list. The iterator automatically wraps at the end of the list.

func NewRoundRobinNodeIterator

func NewRoundRobinNodeIterator(schedulerNodes []*SchedulingNode) *RoundRobinNodeIterator

The starting point is randomised in the slice.

func (*RoundRobinNodeIterator) HasNext

func (bi *RoundRobinNodeIterator) HasNext() bool

HasNext returns true if there is a next element in the array. Returns false if there are no more elements or list is empty.

func (*RoundRobinNodeIterator) Next

Next returns the next element and advances to next element in array. Returns nil at the end of iteration.

func (*RoundRobinNodeIterator) Reset

func (ri *RoundRobinNodeIterator) Reset()

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Responsibility of this class is, get status from SchedulerCache, and send allocation / release proposal back to cache.

Scheduler may maintain its local status which is different from SchedulerCache

func NewScheduler

func NewScheduler(clusterInfo *cache.ClusterInfo) *Scheduler

func (*Scheduler) GetClusterSchedulingContext

func (m *Scheduler) GetClusterSchedulingContext() *ClusterSchedulingContext

Visible by tests

func (*Scheduler) HandleEvent

func (m *Scheduler) HandleEvent(ev interface{})

Implement methods for Scheduler events

func (*Scheduler) SingleStepPreemption

func (m *Scheduler) SingleStepPreemption()

Visible by tests

func (*Scheduler) SingleStepScheduleAllocTest

func (m *Scheduler) SingleStepScheduleAllocTest(nAlloc int)

Visible by tests

func (*Scheduler) StartService

func (m *Scheduler) StartService(handlers handler.EventHandlers, manualSchedule bool)

Start service

type SchedulingAllocation

type SchedulingAllocation struct {
	SchedulingAsk *SchedulingAllocationAsk
	NumAllocation int32
	NodeID        string
	Releases      []*commonevents.ReleaseAllocation
	PartitionName string
}

func NewSchedulingAllocation

func NewSchedulingAllocation(ask *SchedulingAllocationAsk, nodeID string) *SchedulingAllocation

func (*SchedulingAllocation) String

func (m *SchedulingAllocation) String() string

type SchedulingAllocationAsk

type SchedulingAllocationAsk struct {
	// Original ask
	AskProto *si.AllocationAsk

	// Extracted info
	AllocatedResource  *resources.Resource
	PendingRepeatAsk   int32
	ApplicationID      string
	PartitionName      string
	NormalizedPriority int32
	QueueName          string
	// contains filtered or unexported fields
}

func ConvertFromAllocation

func ConvertFromAllocation(allocation *si.Allocation, rmID string) *SchedulingAllocationAsk

func NewSchedulingAllocationAsk

func NewSchedulingAllocationAsk(ask *si.AllocationAsk) *SchedulingAllocationAsk

func (*SchedulingAllocationAsk) AddPendingAskRepeat

func (m *SchedulingAllocationAsk) AddPendingAskRepeat(delta int32) bool

Add delta to pending ask,

if original_pending + delta >= 0, return true. And update internal pending ask.
If original_pending + delta < 0, return false and keep original_pending unchanged.

type SchedulingApplication

type SchedulingApplication struct {
	ApplicationInfo      *cache.ApplicationInfo
	Requests             *SchedulingRequests
	MayAllocatedResource *resources.Resource // Maybe allocated, set by scheduler
	// contains filtered or unexported fields
}

func NewSchedulingApplication

func NewSchedulingApplication(appInfo *cache.ApplicationInfo) *SchedulingApplication

type SchedulingNode

type SchedulingNode struct {
	NodeID string
	// contains filtered or unexported fields
}

func NewSchedulingNode

func NewSchedulingNode(info *cache.NodeInfo) *SchedulingNode

func (*SchedulingNode) CheckAllocateConditions

func (sn *SchedulingNode) CheckAllocateConditions(allocID string) bool

Checking pre allocation conditions. The pre-allocation conditions are implemented via plugins in the shim. If no plugins are implemented then the check will return true. If multiple plugins are implemented the first failure will stop the checks. The caller must thus not rely on all plugins being executed. This is a lock free call as it does not change the node and multiple predicate checks could be run at the same time.

func (*SchedulingNode) CheckAndAllocateResource

func (sn *SchedulingNode) CheckAndAllocateResource(delta *resources.Resource, preemptionPhase bool) bool

Check and update allocating resources of the scheduling node. If the proposed allocation fits in the available resources, taking into account resources marked for preemption if applicable, the allocating resources are updated and true is returned. If the proposed allocation does not fit false is returned and no changes are made.

func (*SchedulingNode) CheckBasicAllocateCondition

func (sn *SchedulingNode) CheckBasicAllocateCondition(delta *resources.Resource) bool

TODO how to handle preemption?

func (*SchedulingNode) GetAllocatedResource

func (sn *SchedulingNode) GetAllocatedResource() *resources.Resource

Get the allocated resource on this node. These resources are just the confirmed allocations (tracked in the cache node). This does not lock the cache node as it will take its own lock.

type SchedulingQueue

type SchedulingQueue struct {
	Name                string              // Fully qualified path for the queue
	CachedQueueInfo     *cache.QueueInfo    // link back to the queue in the cache
	ProposingResource   *resources.Resource // How much resource added for proposing, this is used by queue sort when do candidate selection
	ApplicationSortType SortType            // How applications are sorted (leaf queue only)
	QueueSortType       SortType            // How sub queues are sorted (parent queue only)
	// contains filtered or unexported fields
}

Represents Queue inside Scheduler

func NewSchedulingQueueInfo

func NewSchedulingQueueInfo(cacheQueueInfo *cache.QueueInfo, parent *SchedulingQueue) *SchedulingQueue

func (*SchedulingQueue) AddSchedulingApplication

func (sq *SchedulingQueue) AddSchedulingApplication(app *SchedulingApplication)

Add scheduling app to the queue

func (*SchedulingQueue) CheckAdminAccess

func (sq *SchedulingQueue) CheckAdminAccess(user security.UserGroup) bool

Check if the user has access to the queue for admin actions. Calls the cache queue which is doing the real work.

func (*SchedulingQueue) CheckSubmitAccess

func (sq *SchedulingQueue) CheckSubmitAccess(user security.UserGroup) bool

Check if the user has access to the queue to submit an application. This will check the submit ACL and the admin ACL. Calls the cache queue which is doing the real work.

func (*SchedulingQueue) DecPendingResource

func (sq *SchedulingQueue) DecPendingResource(delta *resources.Resource)

Remove pending resource of this queue

func (*SchedulingQueue) GetAllocatingResource

func (sq *SchedulingQueue) GetAllocatingResource() *resources.Resource

func (*SchedulingQueue) GetCopyOfChildren

func (sq *SchedulingQueue) GetCopyOfChildren() map[string]*SchedulingQueue

Get a copy of the child queues

func (*SchedulingQueue) GetPendingResource

func (sq *SchedulingQueue) GetPendingResource() *resources.Resource

func (*SchedulingQueue) IncAllocatingResource

func (sq *SchedulingQueue) IncAllocatingResource(newAlloc *resources.Resource)

func (*SchedulingQueue) IncPendingResource

func (sq *SchedulingQueue) IncPendingResource(delta *resources.Resource)

Update pending resource of this queue

func (*SchedulingQueue) RemoveQueue

func (sq *SchedulingQueue) RemoveQueue() bool

Remove the queue from the structure. Since nothing is allocated there shouldn't be anything referencing this queue any more. The real removal is removing the queue from the parent's child list, use read lock on the queue

func (*SchedulingQueue) RemoveSchedulingApplication

func (sq *SchedulingQueue) RemoveSchedulingApplication(app *SchedulingApplication)

Remove scheduling app and pending resource of this queue and update the parent queues

func (*SchedulingQueue) SetAllocatingResource

func (sq *SchedulingQueue) SetAllocatingResource(newAlloc *resources.Resource)

type SchedulingRequests

type SchedulingRequests struct {
	// contains filtered or unexported fields
}

Responsibility of this class: - Hold pending scheduling Requests. - Pre-aggregate scheduling Requests by pre-defined keys, and calculate pending resources

func NewSchedulingRequests

func NewSchedulingRequests() *SchedulingRequests

func (*SchedulingRequests) AddAllocationAsk

func (m *SchedulingRequests) AddAllocationAsk(ask *SchedulingAllocationAsk) (*resources.Resource, error)

Add new or replace Return delta of pending resource and error if anything bad happens.

func (*SchedulingRequests) CleanupAllocationAsks

func (m *SchedulingRequests) CleanupAllocationAsks() *resources.Resource

Remove all allocation asks returns (change of pending resource), when no asks, return nil

func (*SchedulingRequests) GetPendingResource

func (m *SchedulingRequests) GetPendingResource() *resources.Resource

func (*SchedulingRequests) GetSchedulingAllocationAsk

func (m *SchedulingRequests) GetSchedulingAllocationAsk(allocationKey string) *SchedulingAllocationAsk

func (*SchedulingRequests) RemoveAllocationAsk

func (m *SchedulingRequests) RemoveAllocationAsk(allocationKey string) (*resources.Resource, *SchedulingAllocationAsk)

Remove allocation ask by key. returns (change of pending resource, ask), return (nil, nil) if key cannot be found

func (*SchedulingRequests) UpdateAllocationAskRepeat

func (m *SchedulingRequests) UpdateAllocationAskRepeat(allocationKey string, delta int32) (*resources.Resource, error)

Update AllocationAsk #repeat, when delta > 0, increase repeat by delta, when delta < 0, decrease repeat by -delta Returns error when allocationKey doesn't exist, or illegal delta specified. Return change of pending resources, it will be used to update queues, applications, etc.

type SortType

type SortType int32

Sort type for queues, apps, nodes etc.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL