Documentation ¶
Index ¶
- Constants
- func GetApplicationIDFromTags(tags map[string]string) string
- func GetPartitionFromTags(tags map[string]string) string
- func SortApplications(apps []*SchedulingApplication, sortType SortType, ...)
- func SortNodes(nodes []*SchedulingNode, sortType SortType)
- func SortQueue(queues []*SchedulingQueue, sortType SortType)
- type ClusterSchedulingContext
- func (csc *ClusterSchedulingContext) AddSchedulingApplication(schedulingApp *SchedulingApplication) error
- func (csc *ClusterSchedulingContext) GetSchedulingApplication(appID string, partitionName string) *SchedulingApplication
- func (csc *ClusterSchedulingContext) GetSchedulingNode(nodeID, partitionName string) *SchedulingNode
- func (csc *ClusterSchedulingContext) GetSchedulingQueue(queueName string, partitionName string) *SchedulingQueue
- func (csc *ClusterSchedulingContext) NeedPreemption() bool
- func (csc *ClusterSchedulingContext) RemoveSchedulingApplication(appID string, partitionName string) (*SchedulingApplication, error)
- func (csc *ClusterSchedulingContext) RemoveSchedulingPartitionsByRMId(rmID string)
- type DRFPreemptionPolicy
- type DefaultNodeIterator
- type NodeIterator
- type PartitionManager
- type PartitionSchedulingContext
- type PreemptionPolicy
- type RoundRobinNodeIterator
- type Scheduler
- func (m *Scheduler) GetClusterSchedulingContext() *ClusterSchedulingContext
- func (m *Scheduler) HandleEvent(ev interface{})
- func (m *Scheduler) SingleStepPreemption()
- func (m *Scheduler) SingleStepScheduleAllocTest(nAlloc int)
- func (m *Scheduler) StartService(handlers handler.EventHandlers, manualSchedule bool)
- type SchedulingAllocation
- type SchedulingAllocationAsk
- type SchedulingApplication
- type SchedulingNode
- func (sn *SchedulingNode) CheckAllocateConditions(allocID string) bool
- func (sn *SchedulingNode) CheckAndAllocateResource(delta *resources.Resource, preemptionPhase bool) bool
- func (sn *SchedulingNode) CheckBasicAllocateCondition(delta *resources.Resource) bool
- func (sn *SchedulingNode) GetAllocatedResource() *resources.Resource
- type SchedulingQueue
- func (sq *SchedulingQueue) AddSchedulingApplication(app *SchedulingApplication)
- func (sq *SchedulingQueue) CheckAdminAccess(user security.UserGroup) bool
- func (sq *SchedulingQueue) CheckSubmitAccess(user security.UserGroup) bool
- func (sq *SchedulingQueue) DecPendingResource(delta *resources.Resource)
- func (sq *SchedulingQueue) GetAllocatingResource() *resources.Resource
- func (sq *SchedulingQueue) GetCopyOfChildren() map[string]*SchedulingQueue
- func (sq *SchedulingQueue) GetPendingResource() *resources.Resource
- func (sq *SchedulingQueue) IncAllocatingResource(newAlloc *resources.Resource)
- func (sq *SchedulingQueue) IncPendingResource(delta *resources.Resource)
- func (sq *SchedulingQueue) RemoveQueue() bool
- func (sq *SchedulingQueue) RemoveSchedulingApplication(app *SchedulingApplication)
- func (sq *SchedulingQueue) SetAllocatingResource(newAlloc *resources.Resource)
- type SchedulingRequests
- func (m *SchedulingRequests) AddAllocationAsk(ask *SchedulingAllocationAsk) (*resources.Resource, error)
- func (m *SchedulingRequests) CleanupAllocationAsks() *resources.Resource
- func (m *SchedulingRequests) GetPendingResource() *resources.Resource
- func (m *SchedulingRequests) GetSchedulingAllocationAsk(allocationKey string) *SchedulingAllocationAsk
- func (m *SchedulingRequests) RemoveAllocationAsk(allocationKey string) (*resources.Resource, *SchedulingAllocationAsk)
- func (m *SchedulingRequests) UpdateAllocationAskRepeat(allocationKey string, delta int32) (*resources.Resource, error)
- type SortType
Constants ¶
const ( FairSortPolicy = 0 FifoSortPolicy = 1 MaxAvailableResources = 2 // node sorting, descending on available resources MinAvailableResources = 3 // node sorting, ascending on available resources )
Variables ¶
This section is empty.
Functions ¶
func GetPartitionFromTags ¶
func SortApplications ¶
func SortApplications(apps []*SchedulingApplication, sortType SortType, globalResource *resources.Resource)
func SortNodes ¶
func SortNodes(nodes []*SchedulingNode, sortType SortType)
func SortQueue ¶
func SortQueue(queues []*SchedulingQueue, sortType SortType)
Types ¶
type ClusterSchedulingContext ¶
type ClusterSchedulingContext struct {
// contains filtered or unexported fields
}
func NewClusterSchedulingContext ¶
func NewClusterSchedulingContext() *ClusterSchedulingContext
func (*ClusterSchedulingContext) AddSchedulingApplication ¶
func (csc *ClusterSchedulingContext) AddSchedulingApplication(schedulingApp *SchedulingApplication) error
func (*ClusterSchedulingContext) GetSchedulingApplication ¶
func (csc *ClusterSchedulingContext) GetSchedulingApplication(appID string, partitionName string) *SchedulingApplication
func (*ClusterSchedulingContext) GetSchedulingNode ¶
func (csc *ClusterSchedulingContext) GetSchedulingNode(nodeID, partitionName string) *SchedulingNode
Get a scheduling node based name and partition. Returns nil if the partition or node cannot be found.
func (*ClusterSchedulingContext) GetSchedulingQueue ¶
func (csc *ClusterSchedulingContext) GetSchedulingQueue(queueName string, partitionName string) *SchedulingQueue
Visible by tests
func (*ClusterSchedulingContext) NeedPreemption ¶
func (csc *ClusterSchedulingContext) NeedPreemption() bool
func (*ClusterSchedulingContext) RemoveSchedulingApplication ¶
func (csc *ClusterSchedulingContext) RemoveSchedulingApplication(appID string, partitionName string) (*SchedulingApplication, error)
func (*ClusterSchedulingContext) RemoveSchedulingPartitionsByRMId ¶
func (csc *ClusterSchedulingContext) RemoveSchedulingPartitionsByRMId(rmID string)
type DRFPreemptionPolicy ¶
type DRFPreemptionPolicy struct { }
Preemption policy based-on DRF
func (*DRFPreemptionPolicy) DoPreemption ¶
func (m *DRFPreemptionPolicy) DoPreemption(scheduler *Scheduler)
type DefaultNodeIterator ¶
type DefaultNodeIterator struct {
// contains filtered or unexported fields
}
Default iterator, wraps the base iterator. Iterates over the list from the start, position zero, to end.
func NewDefaultNodeIterator ¶
func NewDefaultNodeIterator(schedulerNodes []*SchedulingNode) *DefaultNodeIterator
Create a new default iterator
func (*DefaultNodeIterator) HasNext ¶
func (bi *DefaultNodeIterator) HasNext() bool
HasNext returns true if there is a next element in the array. Returns false if there are no more elements or list is empty.
func (*DefaultNodeIterator) Next ¶
func (bi *DefaultNodeIterator) Next() *SchedulingNode
Next returns the next element and advances to next element in array. Returns nil at the end of iteration.
type NodeIterator ¶
type NodeIterator interface { // returns true if there are more values to iterate over HasNext() (ok bool) // returns the next node from the iterator Next() (node *SchedulingNode) // reset the iterator to a clean state Reset() }
NodeIterator iterates over a list of nodes based on the defined policy
type PartitionManager ¶
type PartitionManager struct {
// contains filtered or unexported fields
}
func (PartitionManager) Run ¶
func (manager PartitionManager) Run()
Run the manager for the partition. The manager has two tasks: - clean up the managed queues that are empty and removed from the configuration - remove empty unmanaged queues When the manager exits the partition is removed from the system and must be cleaned up
func (PartitionManager) Stop ¶
func (manager PartitionManager) Stop()
Set the flag that the will allow the manager to exit. No locking needed as there is just one place where this is called which is already locked.
type PartitionSchedulingContext ¶
type PartitionSchedulingContext struct { Root *SchedulingQueue // start of the scheduling queue hierarchy RmID string // the RM the partition belongs to Name string // name of the partition (logging mainly) // contains filtered or unexported fields }
func (*PartitionSchedulingContext) AddSchedulingApplication ¶
func (psc *PartitionSchedulingContext) AddSchedulingApplication(schedulingApp *SchedulingApplication) error
Add a new application to the scheduling partition.
func (*PartitionSchedulingContext) GetQueue ¶
func (psc *PartitionSchedulingContext) GetQueue(name string) *SchedulingQueue
Get the queue from the structure based on the fully qualified name. Wrapper around the unlocked version getQueue() Visible by tests
func (*PartitionSchedulingContext) RemoveSchedulingApplication ¶
func (psc *PartitionSchedulingContext) RemoveSchedulingApplication(appID string) (*SchedulingApplication, error)
Remove the application from the scheduling partition.
type PreemptionPolicy ¶
type PreemptionPolicy interface {
DoPreemption(scheduler *Scheduler)
}
type RoundRobinNodeIterator ¶
type RoundRobinNodeIterator struct {
// contains filtered or unexported fields
}
Random iterator, wraps the base iterator Iterates over the list from a random starting position in the list. The iterator automatically wraps at the end of the list.
func NewRoundRobinNodeIterator ¶
func NewRoundRobinNodeIterator(schedulerNodes []*SchedulingNode) *RoundRobinNodeIterator
The starting point is randomised in the slice.
func (*RoundRobinNodeIterator) HasNext ¶
func (bi *RoundRobinNodeIterator) HasNext() bool
HasNext returns true if there is a next element in the array. Returns false if there are no more elements or list is empty.
func (*RoundRobinNodeIterator) Next ¶
func (ri *RoundRobinNodeIterator) Next() *SchedulingNode
Next returns the next element and advances to next element in array. Returns nil at the end of iteration.
func (*RoundRobinNodeIterator) Reset ¶
func (ri *RoundRobinNodeIterator) Reset()
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Responsibility of this class is, get status from SchedulerCache, and send allocation / release proposal back to cache.
Scheduler may maintain its local status which is different from SchedulerCache
func NewScheduler ¶
func NewScheduler(clusterInfo *cache.ClusterInfo) *Scheduler
func (*Scheduler) GetClusterSchedulingContext ¶
func (m *Scheduler) GetClusterSchedulingContext() *ClusterSchedulingContext
Visible by tests
func (*Scheduler) HandleEvent ¶
func (m *Scheduler) HandleEvent(ev interface{})
Implement methods for Scheduler events
func (*Scheduler) SingleStepPreemption ¶
func (m *Scheduler) SingleStepPreemption()
Visible by tests
func (*Scheduler) SingleStepScheduleAllocTest ¶
Visible by tests
func (*Scheduler) StartService ¶
func (m *Scheduler) StartService(handlers handler.EventHandlers, manualSchedule bool)
Start service
type SchedulingAllocation ¶
type SchedulingAllocation struct { SchedulingAsk *SchedulingAllocationAsk NumAllocation int32 NodeID string Releases []*commonevents.ReleaseAllocation PartitionName string }
func NewSchedulingAllocation ¶
func NewSchedulingAllocation(ask *SchedulingAllocationAsk, nodeID string) *SchedulingAllocation
func (*SchedulingAllocation) String ¶
func (m *SchedulingAllocation) String() string
type SchedulingAllocationAsk ¶
type SchedulingAllocationAsk struct { // Original ask AskProto *si.AllocationAsk // Extracted info AllocatedResource *resources.Resource PendingRepeatAsk int32 ApplicationID string PartitionName string NormalizedPriority int32 QueueName string // contains filtered or unexported fields }
func ConvertFromAllocation ¶
func ConvertFromAllocation(allocation *si.Allocation, rmID string) *SchedulingAllocationAsk
func NewSchedulingAllocationAsk ¶
func NewSchedulingAllocationAsk(ask *si.AllocationAsk) *SchedulingAllocationAsk
func (*SchedulingAllocationAsk) AddPendingAskRepeat ¶
func (m *SchedulingAllocationAsk) AddPendingAskRepeat(delta int32) bool
Add delta to pending ask,
if original_pending + delta >= 0, return true. And update internal pending ask. If original_pending + delta < 0, return false and keep original_pending unchanged.
type SchedulingApplication ¶
type SchedulingApplication struct { ApplicationInfo *cache.ApplicationInfo Requests *SchedulingRequests MayAllocatedResource *resources.Resource // Maybe allocated, set by scheduler // contains filtered or unexported fields }
func NewSchedulingApplication ¶
func NewSchedulingApplication(appInfo *cache.ApplicationInfo) *SchedulingApplication
type SchedulingNode ¶
type SchedulingNode struct { NodeID string // contains filtered or unexported fields }
func NewSchedulingNode ¶
func NewSchedulingNode(info *cache.NodeInfo) *SchedulingNode
func (*SchedulingNode) CheckAllocateConditions ¶
func (sn *SchedulingNode) CheckAllocateConditions(allocID string) bool
Checking pre allocation conditions. The pre-allocation conditions are implemented via plugins in the shim. If no plugins are implemented then the check will return true. If multiple plugins are implemented the first failure will stop the checks. The caller must thus not rely on all plugins being executed. This is a lock free call as it does not change the node and multiple predicate checks could be run at the same time.
func (*SchedulingNode) CheckAndAllocateResource ¶
func (sn *SchedulingNode) CheckAndAllocateResource(delta *resources.Resource, preemptionPhase bool) bool
Check and update allocating resources of the scheduling node. If the proposed allocation fits in the available resources, taking into account resources marked for preemption if applicable, the allocating resources are updated and true is returned. If the proposed allocation does not fit false is returned and no changes are made.
func (*SchedulingNode) CheckBasicAllocateCondition ¶
func (sn *SchedulingNode) CheckBasicAllocateCondition(delta *resources.Resource) bool
TODO how to handle preemption?
func (*SchedulingNode) GetAllocatedResource ¶
func (sn *SchedulingNode) GetAllocatedResource() *resources.Resource
Get the allocated resource on this node. These resources are just the confirmed allocations (tracked in the cache node). This does not lock the cache node as it will take its own lock.
type SchedulingQueue ¶
type SchedulingQueue struct { Name string // Fully qualified path for the queue CachedQueueInfo *cache.QueueInfo // link back to the queue in the cache ProposingResource *resources.Resource // How much resource added for proposing, this is used by queue sort when do candidate selection ApplicationSortType SortType // How applications are sorted (leaf queue only) QueueSortType SortType // How sub queues are sorted (parent queue only) // contains filtered or unexported fields }
Represents Queue inside Scheduler
func NewSchedulingQueueInfo ¶
func NewSchedulingQueueInfo(cacheQueueInfo *cache.QueueInfo, parent *SchedulingQueue) *SchedulingQueue
func (*SchedulingQueue) AddSchedulingApplication ¶
func (sq *SchedulingQueue) AddSchedulingApplication(app *SchedulingApplication)
Add scheduling app to the queue
func (*SchedulingQueue) CheckAdminAccess ¶
func (sq *SchedulingQueue) CheckAdminAccess(user security.UserGroup) bool
Check if the user has access to the queue for admin actions. Calls the cache queue which is doing the real work.
func (*SchedulingQueue) CheckSubmitAccess ¶
func (sq *SchedulingQueue) CheckSubmitAccess(user security.UserGroup) bool
Check if the user has access to the queue to submit an application. This will check the submit ACL and the admin ACL. Calls the cache queue which is doing the real work.
func (*SchedulingQueue) DecPendingResource ¶
func (sq *SchedulingQueue) DecPendingResource(delta *resources.Resource)
Remove pending resource of this queue
func (*SchedulingQueue) GetAllocatingResource ¶
func (sq *SchedulingQueue) GetAllocatingResource() *resources.Resource
func (*SchedulingQueue) GetCopyOfChildren ¶
func (sq *SchedulingQueue) GetCopyOfChildren() map[string]*SchedulingQueue
Get a copy of the child queues
func (*SchedulingQueue) GetPendingResource ¶
func (sq *SchedulingQueue) GetPendingResource() *resources.Resource
func (*SchedulingQueue) IncAllocatingResource ¶
func (sq *SchedulingQueue) IncAllocatingResource(newAlloc *resources.Resource)
func (*SchedulingQueue) IncPendingResource ¶
func (sq *SchedulingQueue) IncPendingResource(delta *resources.Resource)
Update pending resource of this queue
func (*SchedulingQueue) RemoveQueue ¶
func (sq *SchedulingQueue) RemoveQueue() bool
Remove the queue from the structure. Since nothing is allocated there shouldn't be anything referencing this queue any more. The real removal is removing the queue from the parent's child list, use read lock on the queue
func (*SchedulingQueue) RemoveSchedulingApplication ¶
func (sq *SchedulingQueue) RemoveSchedulingApplication(app *SchedulingApplication)
Remove scheduling app and pending resource of this queue and update the parent queues
func (*SchedulingQueue) SetAllocatingResource ¶
func (sq *SchedulingQueue) SetAllocatingResource(newAlloc *resources.Resource)
type SchedulingRequests ¶
type SchedulingRequests struct {
// contains filtered or unexported fields
}
Responsibility of this class: - Hold pending scheduling Requests. - Pre-aggregate scheduling Requests by pre-defined keys, and calculate pending resources
func NewSchedulingRequests ¶
func NewSchedulingRequests() *SchedulingRequests
func (*SchedulingRequests) AddAllocationAsk ¶
func (m *SchedulingRequests) AddAllocationAsk(ask *SchedulingAllocationAsk) (*resources.Resource, error)
Add new or replace Return delta of pending resource and error if anything bad happens.
func (*SchedulingRequests) CleanupAllocationAsks ¶
func (m *SchedulingRequests) CleanupAllocationAsks() *resources.Resource
Remove all allocation asks returns (change of pending resource), when no asks, return nil
func (*SchedulingRequests) GetPendingResource ¶
func (m *SchedulingRequests) GetPendingResource() *resources.Resource
func (*SchedulingRequests) GetSchedulingAllocationAsk ¶
func (m *SchedulingRequests) GetSchedulingAllocationAsk(allocationKey string) *SchedulingAllocationAsk
func (*SchedulingRequests) RemoveAllocationAsk ¶
func (m *SchedulingRequests) RemoveAllocationAsk(allocationKey string) (*resources.Resource, *SchedulingAllocationAsk)
Remove allocation ask by key. returns (change of pending resource, ask), return (nil, nil) if key cannot be found
func (*SchedulingRequests) UpdateAllocationAskRepeat ¶
func (m *SchedulingRequests) UpdateAllocationAskRepeat(allocationKey string, delta int32) (*resources.Resource, error)
Update AllocationAsk #repeat, when delta > 0, increase repeat by delta, when delta < 0, decrease repeat by -delta Returns error when allocationKey doesn't exist, or illegal delta specified. Return change of pending resources, it will be used to update queues, applications, etc.
Source Files ¶
- allocator.go
- asks_finder.go
- drf_preemption_policy.go
- drf_preemption_resource_calculator.go
- helper.go
- node_iterator.go
- nodes_usage_monitor.go
- partition_manager.go
- preemptor.go
- requests.go
- scheduler.go
- scheduling_allocation.go
- scheduling_allocation_ask.go
- scheduling_application.go
- scheduling_context.go
- scheduling_metrics.go
- scheduling_node.go
- scheduling_partition.go
- scheduling_queue.go
- sorters.go