Documentation ¶
Index ¶
- Constants
- func GetChildQueueInfos(info *QueueInfo) []dao.QueueDAOInfo
- func UpdateClusterInfoFromConfigFile(clusterInfo *ClusterInfo, rmID string) ([]*PartitionInfo, []*PartitionInfo, error)
- type AllocationInfo
- type ApplicationEvent
- type ApplicationInfo
- func (ai *ApplicationInfo) GetAllAllocations() []*AllocationInfo
- func (ai *ApplicationInfo) GetAllocatedResource() *resources.Resource
- func (ai *ApplicationInfo) GetApplicationState() string
- func (ai *ApplicationInfo) GetTag(tag string) string
- func (ai *ApplicationInfo) GetUser() security.UserGroup
- func (ai *ApplicationInfo) HandleApplicationEvent(event ApplicationEvent) error
- func (ai *ApplicationInfo) SetQueue(leaf *QueueInfo)
- type ApplicationState
- type ClusterInfo
- func (m *ClusterInfo) GetPartition(name string) *PartitionInfo
- func (m *ClusterInfo) GetTotalPartitionResource(partitionName string) *resources.Resource
- func (m *ClusterInfo) HandleEvent(ev interface{})
- func (m *ClusterInfo) ListPartitions() []string
- func (m *ClusterInfo) StartService(handlers handler.EventHandlers)
- type NodeInfo
- func (ni *NodeInfo) AddAllocation(alloc *AllocationInfo)
- func (ni *NodeInfo) CanAllocate(resRequest *resources.Resource) bool
- func (ni *NodeInfo) GetAllAllocations() []*AllocationInfo
- func (ni *NodeInfo) GetAllocatedResource() *resources.Resource
- func (ni *NodeInfo) GetAllocation(uuid string) *AllocationInfo
- func (ni *NodeInfo) GetAttribute(key string) string
- func (ni *NodeInfo) GetAvailableResource() *resources.Resource
- func (ni *NodeInfo) IsSchedulable() bool
- func (ni *NodeInfo) RemoveAllocation(uuid string) *AllocationInfo
- func (ni *NodeInfo) SetSchedulable(schedulable bool)
- type PartitionInfo
- func (pi *PartitionInfo) CalculateNodesResourceUsage() map[string][]int
- func (pi *PartitionInfo) CopyNodeInfos() []*NodeInfo
- func (pi *PartitionInfo) CreateQueues(queueName string) error
- func (pi *PartitionInfo) GetApplications() []*ApplicationInfo
- func (pi *PartitionInfo) GetNewAllocationUUID() string
- func (pi *PartitionInfo) GetNode(nodeID string) *NodeInfo
- func (pi *PartitionInfo) GetNodeSortingPolicy() common.SortingPolicy
- func (pi *PartitionInfo) GetQueue(name string) *QueueInfo
- func (pi *PartitionInfo) GetQueueInfos() []dao.QueueDAOInfo
- func (pi *PartitionInfo) GetRules() []configs.PlacementRule
- func (pi *PartitionInfo) GetTotalAllocationCount() int
- func (pi *PartitionInfo) GetTotalApplicationCount() int
- func (pi *PartitionInfo) GetTotalNodeCount() int
- func (pi *PartitionInfo) GetTotalPartitionResource() *resources.Resource
- func (pi *PartitionInfo) HandlePartitionEvent(event SchedulingObjectEvent) error
- func (pi *PartitionInfo) IsDraining() bool
- func (pi *PartitionInfo) IsRunning() bool
- func (pi *PartitionInfo) IsStopped() bool
- func (pi *PartitionInfo) MarkPartitionForRemoval()
- func (pi *PartitionInfo) NeedPreemption() bool
- func (pi *PartitionInfo) Remove()
- func (pi *PartitionInfo) RemoveApplication(appID string) (*ApplicationInfo, []*AllocationInfo)
- func (pi *PartitionInfo) RemoveNode(nodeID string)
- func (pi *PartitionInfo) RemoveRejectedApp(appID string)
- type QueueInfo
- func (qi *QueueInfo) AddChildQueue(child *QueueInfo) error
- func (qi *QueueInfo) CheckAdminAccess(user security.UserGroup) bool
- func (qi *QueueInfo) CheckSubmitAccess(user security.UserGroup) bool
- func (qi *QueueInfo) DecAllocatedResource(alloc *resources.Resource) error
- func (qi *QueueInfo) GetAllocatedResource() *resources.Resource
- func (qi *QueueInfo) GetCopyOfChildren() map[string]*QueueInfo
- func (qi *QueueInfo) GetQueuePath() string
- func (qi *QueueInfo) HandleQueueEvent(event SchedulingObjectEvent) error
- func (qi *QueueInfo) IncAllocatedResource(alloc *resources.Resource, nodeReported bool) error
- func (qi *QueueInfo) IsDraining() bool
- func (qi *QueueInfo) IsLeafQueue() bool
- func (qi *QueueInfo) IsManaged() bool
- func (qi *QueueInfo) IsRunning() bool
- func (qi *QueueInfo) IsStopped() bool
- func (qi *QueueInfo) MarkQueueForRemoval()
- func (qi *QueueInfo) RemoveQueue() bool
- type SchedulingObjectEvent
- type SchedulingObjectState
Constants ¶
const ( DOT = "." DotReplace = "_dot_" // How to sort applications, valid options are fair / fifo ApplicationSortPolicy = "application.sort.policy" )
Variables ¶
This section is empty.
Functions ¶
func GetChildQueueInfos ¶
func GetChildQueueInfos(info *QueueInfo) []dao.QueueDAOInfo
TODO fix this: should only return one element, only a root queue remove hard coded values and unknown AbsUsedCapacity map status to the draining flag
func UpdateClusterInfoFromConfigFile ¶
func UpdateClusterInfoFromConfigFile(clusterInfo *ClusterInfo, rmID string) ([]*PartitionInfo, []*PartitionInfo, error)
Update the existing cluster info: - add new partitions - update existing partitions - remove deleted partitions updates and add internally are processed differently outside of this method they are the same.
Types ¶
type AllocationInfo ¶
type AllocationInfo struct { // Original protocol AllocationProto *si.Allocation // Other information ApplicationID string AllocatedResource *resources.Resource }
Related to Allocation
func NewAllocationInfo ¶
func NewAllocationInfo(uuid string, alloc *commonevents.AllocationProposal) *AllocationInfo
type ApplicationEvent ¶
type ApplicationEvent int
---------------------------------- application events ----------------------------------
const ( AcceptApplication ApplicationEvent = iota RejectApplication RunApplication CompleteApplication KillApplication )
func (ApplicationEvent) String ¶
func (ae ApplicationEvent) String() string
type ApplicationInfo ¶
type ApplicationInfo struct { ApplicationID string Partition string QueueName string SubmissionTime int64 // contains filtered or unexported fields }
Related to applications
func NewApplicationInfo ¶
func NewApplicationInfo(appID, partition, queueName string, ugi security.UserGroup, tags map[string]string) *ApplicationInfo
Create a new application
func (*ApplicationInfo) GetAllAllocations ¶
func (ai *ApplicationInfo) GetAllAllocations() []*AllocationInfo
Return the current allocations for the application.
func (*ApplicationInfo) GetAllocatedResource ¶
func (ai *ApplicationInfo) GetAllocatedResource() *resources.Resource
Return the total allocated resources for the application.
func (*ApplicationInfo) GetApplicationState ¶
func (ai *ApplicationInfo) GetApplicationState() string
Return the current state for the application. The state machine handles the locking.
func (*ApplicationInfo) GetTag ¶
func (ai *ApplicationInfo) GetTag(tag string) string
Get a tag from the application Note: Tags are not case sensitive
func (*ApplicationInfo) GetUser ¶
func (ai *ApplicationInfo) GetUser() security.UserGroup
get a copy of the user details for the application
func (*ApplicationInfo) HandleApplicationEvent ¶
func (ai *ApplicationInfo) HandleApplicationEvent(event ApplicationEvent) error
Handle the state event for the application. The state machine handles the locking.
func (*ApplicationInfo) SetQueue ¶
func (ai *ApplicationInfo) SetQueue(leaf *QueueInfo)
Set the leaf queue the application runs in. Update the queue name also to match as this might be different from the queue that was given when submitting the application.
type ApplicationState ¶
type ApplicationState int
---------------------------------- application states ----------------------------------
const ( New ApplicationState = iota Accepted Rejected Running Completed Killed )
func (ApplicationState) String ¶
func (as ApplicationState) String() string
type ClusterInfo ¶
type ClusterInfo struct { // RM Event Handler EventHandlers handler.EventHandlers // contains filtered or unexported fields }
func NewClusterInfo ¶
func NewClusterInfo() (info *ClusterInfo)
func (*ClusterInfo) GetPartition ¶
func (m *ClusterInfo) GetPartition(name string) *PartitionInfo
Get the partition by name. Locked call, used outside of the cache.
func (*ClusterInfo) GetTotalPartitionResource ¶
func (m *ClusterInfo) GetTotalPartitionResource(partitionName string) *resources.Resource
Get the total resources for the partition by name. Locked call, used outside of the cache.
func (*ClusterInfo) HandleEvent ¶
func (m *ClusterInfo) HandleEvent(ev interface{})
Implement methods for Cache events
func (*ClusterInfo) ListPartitions ¶
func (m *ClusterInfo) ListPartitions() []string
Get the list of partitions. Locked call, used outside of the cache.
func (*ClusterInfo) StartService ¶
func (m *ClusterInfo) StartService(handlers handler.EventHandlers)
Start service
type NodeInfo ¶
type NodeInfo struct { // Fields for fast access These fields are considered read only. // Values should only be set when creating a new node and never changed. NodeID string Hostname string Rackname string Partition string TotalResource *resources.Resource // contains filtered or unexported fields }
The node structure used throughout the system
func NewNodeForSort ¶
Node to test with sorters (setting available resources)
func NewNodeForTest ¶
Node to test with anything but the sorters (setting total resources)
func NewNodeInfo ¶
func NewNodeInfo(proto *si.NewNodeInfo) *NodeInfo
Create a new node from the protocol object. The object can only be nil if the si.NewNodeInfo is nil, otherwise a valid object is returned.
func (*NodeInfo) AddAllocation ¶
func (ni *NodeInfo) AddAllocation(alloc *AllocationInfo)
Add the allocation to the node.Used resources will increase available will decrease. This cannot fail. A nil AllocationInfo makes no changes.
func (*NodeInfo) CanAllocate ¶
Check if the allocation fits in the currently available resources.
func (*NodeInfo) GetAllAllocations ¶
func (ni *NodeInfo) GetAllAllocations() []*AllocationInfo
Get a copy of the allocations on this node
func (*NodeInfo) GetAllocatedResource ¶
Return the currently allocated resource for the node. It returns a cloned object as we do not want to allow modifications to be made to the value of the node.
func (*NodeInfo) GetAllocation ¶
func (ni *NodeInfo) GetAllocation(uuid string) *AllocationInfo
Return the allocation based on the uuid of the allocation. returns nil if the allocation is not found
func (*NodeInfo) GetAttribute ¶
Get an attribute by name. The most used attributes can be directly accessed via the fields: HostName, RackName and Partition. This is a lock free call. All attributes are considered read only
func (*NodeInfo) GetAvailableResource ¶
Return the currently available resource for the node. It returns a cloned object as we do not want to allow modifications to be made to the value of the node.
func (*NodeInfo) IsSchedulable ¶
Can this node be used in scheduling.
func (*NodeInfo) RemoveAllocation ¶
func (ni *NodeInfo) RemoveAllocation(uuid string) *AllocationInfo
Remove the allocation to the node. Returns nil if the allocation was not found and no changes are made. If the allocation is found the AllocationInfo removed is returned. Used resources will decrease available will increase as per the allocation found.
func (*NodeInfo) SetSchedulable ¶
Set the node to unschedulable. This will cause the node to be skipped during the scheduling cycle. Visible for testing only
type PartitionInfo ¶
type PartitionInfo struct { Name string Root *QueueInfo RmID string // contains filtered or unexported fields }
Related to partitions
func CreatePartitionInfo ¶
func CreatePartitionInfo(data []byte) (*PartitionInfo, error)
func NewPartitionInfo ¶
func NewPartitionInfo(partition configs.PartitionConfig, rmID string, info *ClusterInfo) (*PartitionInfo, error)
Create a new partition from scratch based on a validated configuration. If the configuration did not pass validation and is processed weird errors could occur.
func SetClusterInfoFromConfigFile ¶
func SetClusterInfoFromConfigFile(clusterInfo *ClusterInfo, rmID string, policyGroup string) ([]*PartitionInfo, error)
Create the mew partition configuration and ass all of them to the cluster. This function may only be called by the scheduler when a RM registers. It creates a new PartitionInfo from scratch and does not merge the configurations.
func (*PartitionInfo) CalculateNodesResourceUsage ¶
func (pi *PartitionInfo) CalculateNodesResourceUsage() map[string][]int
calculate overall nodes resource usage and returns a map as the result, where the key is the resource name, e.g memory, and the value is a []int, which is a slice with 10 elements, each element represents a range of resource usage, such as
0: 0%->10% 1: 10% -> 20% ... 9: 90% -> 100%
the element value represents number of nodes fall into this bucket. if slice[9] = 3, this means there are 3 nodes resource usage is in the range 80% to 90%.
func (*PartitionInfo) CopyNodeInfos ¶
func (pi *PartitionInfo) CopyNodeInfos() []*NodeInfo
Return a copy of all the nodes registers to this partition
func (*PartitionInfo) CreateQueues ¶
func (pi *PartitionInfo) CreateQueues(queueName string) error
Create the new queue that is returned from a rule. It creates a queue with all parents needed.
func (*PartitionInfo) GetApplications ¶
func (pi *PartitionInfo) GetApplications() []*ApplicationInfo
func (*PartitionInfo) GetNewAllocationUUID ¶
func (pi *PartitionInfo) GetNewAllocationUUID() string
Generate a new uuid for the allocation. This is guaranteed to return a unique ID for this partition.
func (*PartitionInfo) GetNode ¶
func (pi *PartitionInfo) GetNode(nodeID string) *NodeInfo
Get the node object for the node ID as tracked by the partition. This will return nil if the node is not part of this partition. Visible by tests
func (*PartitionInfo) GetNodeSortingPolicy ¶
func (pi *PartitionInfo) GetNodeSortingPolicy() common.SortingPolicy
Is bin-packing scheduling enabled? TODO: more finer enum based return model here is better instead of bool.
func (*PartitionInfo) GetQueue ¶
func (pi *PartitionInfo) GetQueue(name string) *QueueInfo
Get the queue from the structure based on the fully qualified name. Wrapper around the unlocked version getQueue()
func (*PartitionInfo) GetQueueInfos ¶
func (pi *PartitionInfo) GetQueueInfos() []dao.QueueDAOInfo
TODO fix this: should only return one element, only a root queue remove hard coded values and unknown AbsUsedCapacity map status to the draining flag
func (*PartitionInfo) GetRules ¶
func (pi *PartitionInfo) GetRules() []configs.PlacementRule
Return the config element for the placement rules
func (*PartitionInfo) GetTotalAllocationCount ¶
func (pi *PartitionInfo) GetTotalAllocationCount() int
func (*PartitionInfo) GetTotalApplicationCount ¶
func (pi *PartitionInfo) GetTotalApplicationCount() int
func (*PartitionInfo) GetTotalNodeCount ¶
func (pi *PartitionInfo) GetTotalNodeCount() int
func (*PartitionInfo) GetTotalPartitionResource ¶
func (pi *PartitionInfo) GetTotalPartitionResource() *resources.Resource
func (*PartitionInfo) HandlePartitionEvent ¶
func (pi *PartitionInfo) HandlePartitionEvent(event SchedulingObjectEvent) error
Handle the state event for the application. The state machine handles the locking.
func (*PartitionInfo) IsDraining ¶
func (pi *PartitionInfo) IsDraining() bool
Is the partition marked for deletion and can only handle existing application requests. No new applications will be accepted.
func (*PartitionInfo) IsRunning ¶
func (pi *PartitionInfo) IsRunning() bool
func (*PartitionInfo) IsStopped ¶
func (pi *PartitionInfo) IsStopped() bool
func (*PartitionInfo) MarkPartitionForRemoval ¶
func (pi *PartitionInfo) MarkPartitionForRemoval()
Mark the partition for removal from the system. This can be executed multiple times and is only effective the first time.
func (*PartitionInfo) NeedPreemption ¶
func (pi *PartitionInfo) NeedPreemption() bool
Does the partition allow pre-emption?
func (*PartitionInfo) Remove ¶
func (pi *PartitionInfo) Remove()
The partition has been removed from the configuration and must be removed. This is the cleanup triggered by the exiting scheduler partition. Just unlinking from the cluster should be enough. All other clean up is triggered from the scheduler
func (*PartitionInfo) RemoveApplication ¶
func (pi *PartitionInfo) RemoveApplication(appID string) (*ApplicationInfo, []*AllocationInfo)
Remove the application from the partition. This will also release all the allocations for application from the queue and nodes.
func (*PartitionInfo) RemoveNode ¶
func (pi *PartitionInfo) RemoveNode(nodeID string)
Remove a node from the partition. This locks the partition and calls the internal unlocked version.
func (*PartitionInfo) RemoveRejectedApp ¶
func (pi *PartitionInfo) RemoveRejectedApp(appID string)
Remove a rejected application from the partition. This is just a cleanup, the app has not been scheduled yet.
type QueueInfo ¶
type QueueInfo struct { Name string MaxResource *resources.Resource // When not set, max = nil GuaranteedResource *resources.Resource // When not set, Guaranteed == 0 Parent *QueueInfo // link to the parent queue Properties map[string]string // this should be treated as immutable the value is a merge of parent(s) // contains filtered or unexported fields }
The queue structure as used throughout the scheduler
func NewManagedQueue ¶
func NewManagedQueue(conf configs.QueueConfig, parent *QueueInfo) (*QueueInfo, error)
Create a new queue from the configuration object. The configuration is validated before we call this: we should not see any errors.
func NewUnmanagedQueue ¶
Create a new queue unmanaged queue Rule base queue which might not fit in the structure or fail parsing
func (*QueueInfo) AddChildQueue ¶
Add a new child queue to this queue - can only add to a non leaf queue - cannot add when the queue is marked for deletion - if this is the first child initialise
func (*QueueInfo) CheckAdminAccess ¶
Check if the user has access to the queue for admin actions recursively.
func (*QueueInfo) CheckSubmitAccess ¶
Check if the user has access to the queue to submit an application recursively. This will check the submit ACL and the admin ACL.
func (*QueueInfo) DecAllocatedResource ¶
Decrement the allocated resources for this queue (recursively) Guard against going below zero resources.
func (*QueueInfo) GetAllocatedResource ¶
func (*QueueInfo) GetCopyOfChildren ¶
func (*QueueInfo) GetQueuePath ¶
Get the fully qualified path name
func (*QueueInfo) HandleQueueEvent ¶
func (qi *QueueInfo) HandleQueueEvent(event SchedulingObjectEvent) error
Handle the state event for the application. The state machine handles the locking.
func (*QueueInfo) IncAllocatedResource ¶
Increment the allocated resources for this queue (recursively) Guard against going over max resources if the
func (*QueueInfo) IsDraining ¶
Is the queue marked for deletion and can only handle existing application requests. No new applications will be accepted.
func (*QueueInfo) IsLeafQueue ¶
Return if this is a leaf queue or not
func (*QueueInfo) MarkQueueForRemoval ¶
func (qi *QueueInfo) MarkQueueForRemoval()
Mark the managed queue for removal from the system. This can be executed multiple times and is only effective the first time. This is a noop on an unmanaged queue
func (*QueueInfo) RemoveQueue ¶
Remove the queue from the structure. Since nothing is allocated there shouldn't be anything referencing this queue any more. The real removal is removing the queue from the parent's child list
type SchedulingObjectEvent ¶
type SchedulingObjectEvent int
---------------------------------- object events these events are used for: partitions and managed queues ----------------------------------
const ( Remove SchedulingObjectEvent = iota Start Stop )
func (SchedulingObjectEvent) String ¶
func (soe SchedulingObjectEvent) String() string
type SchedulingObjectState ¶
type SchedulingObjectState int
---------------------------------- object states these states are used by: partitions and managed queues ----------------------------------
const ( Active SchedulingObjectState = iota Draining Stopped )
func (SchedulingObjectState) String ¶
func (sos SchedulingObjectState) String() string