queuejob

package
v0.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2019 License: Apache-2.0 Imports: 38 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// QueueJobNameLabel label string for queuejob name
	QueueJobNameLabel string = "xqueuejob-name"

	// ControllerUIDLabel label string for queuejob controller uid
	ControllerUIDLabel string = "controller-uid"
)
View Source
const (
	// QueueJobLabel label string for queuejob name
	QueueJobLabel string = "queuejob.kube-arbitrator.k8s.io"
)

Variables

This section is empty.

Functions

func GetPodTemplate

func GetPodTemplate(qjobRes *arbv1.XQueueJobResource) (*v1.PodTemplateSpec, error)

func GetQJFullName

func GetQJFullName(qj *arbv1.QueueJob) string

GetPodFullName returns a name that uniquely identifies a qj.

func GetXQJFullName

func GetXQJFullName(qj *arbv1.XQueueJob) string

func HigherPriorityQJ

func HigherPriorityQJ(qj1, qj2 interface{}) bool

func RegisterAllQueueJobResourceTypes

func RegisterAllQueueJobResourceTypes(regs *queuejobresources.RegisteredResources)

RegisterAllQueueJobResourceTypes - gegisters all resources

Types

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

Controller the QueueJob Controller type

func NewQueueJobController

func NewQueueJobController(config *rest.Config) *Controller

NewQueueJobController create new QueueJob Controller

func (*Controller) Run

func (cc *Controller) Run(stopCh chan struct{})

Run start QueueJob Controller

type Heap

type Heap struct {
	// contains filtered or unexported fields
}

Heap is a thread-safe producer/consumer queue that implements a heap data structure. It can be used to implement priority queues and similar data structures.

func (*Heap) Add

func (h *Heap) Add(obj interface{}) error

Add inserts an item, and puts it in the queue. The item is updated if it already exists.

func (*Heap) AddIfNotPresent

func (h *Heap) AddIfNotPresent(obj interface{}) error

AddIfNotPresent inserts an item, and puts it in the queue. If an item with the key is present in the map, no changes is made to the item.

func (*Heap) BulkAdd

func (h *Heap) BulkAdd(list []interface{}) error

BulkAdd adds all the items in the list to the queue.

func (*Heap) Delete

func (h *Heap) Delete(obj interface{}) error

Delete removes an item.

func (*Heap) Get

func (h *Heap) Get(obj interface{}) (interface{}, bool, error)

Get returns the requested item, or sets exists=false.

func (*Heap) GetByKey

func (h *Heap) GetByKey(key string) (interface{}, bool, error)

GetByKey returns the requested item, or sets exists=false.

func (*Heap) List

func (h *Heap) List() []interface{}

List returns a list of all the items.

func (*Heap) Pop

func (h *Heap) Pop() (interface{}, error)

Pop returns the head of the heap.

func (*Heap) Update

func (h *Heap) Update(obj interface{}) error

Update is the same as Add in this implementation. When the item does not exist, it is added.

type KeyFunc

type KeyFunc func(obj interface{}) (string, error)

type LessFunc

type LessFunc func(interface{}, interface{}) bool

type PriorityQueue

type PriorityQueue struct {
	// contains filtered or unexported fields
}

PriorityQueue implements a scheduling queue. It is an alternative to FIFO. The head of PriorityQueue is the highest priority pending QJ. This structure has two sub queues. One sub-queue holds QJ that are being considered for scheduling. This is called activeQ and is a Heap. Another queue holds pods that are already tried and are determined to be unschedulable. The latter is called unschedulableQ. Heap is already thread safe, but we need to acquire another lock here to ensure atomicity of operations on the two data structures..

func NewPriorityQueue

func NewPriorityQueue() *PriorityQueue

func (*PriorityQueue) Add

func (p *PriorityQueue) Add(qj *qjobv1.XQueueJob) error

Add adds a QJ to the active queue. It should be called only when a new QJ is added so there is no chance the QJ is already in either queue.

func (*PriorityQueue) AddIfNotPresent

func (p *PriorityQueue) AddIfNotPresent(qj *qjobv1.XQueueJob) error

AddIfNotPresent adds a pod to the active queue if it is not present in any of the two queues. If it is present in any, it doesn't do any thing.

func (*PriorityQueue) AddUnschedulableIfNotPresent

func (p *PriorityQueue) AddUnschedulableIfNotPresent(qj *qjobv1.XQueueJob) error

AddUnschedulableIfNotPresent does nothing if the pod is present in either queue. Otherwise it adds the pod to the unschedulable queue if p.receivedMoveRequest is false, and to the activeQ if p.receivedMoveRequest is true.

func (*PriorityQueue) Delete

func (p *PriorityQueue) Delete(qj *qjobv1.XQueueJob) error

Delete deletes the item from either of the two queues. It assumes the pod is only in one queue.

func (*PriorityQueue) MoveAllToActiveQueue

func (p *PriorityQueue) MoveAllToActiveQueue()

MoveAllToActiveQueue moves all pods from unschedulableQ to activeQ. This function adds all pods and then signals the condition variable to ensure that if Pop() is waiting for an item, it receives it after all the pods are in the queue and the head is the highest priority pod. TODO(bsalamat): We should add a back-off mechanism here so that a high priority pod which is unschedulable does not go to the head of the queue frequently. For example in a cluster where a lot of pods being deleted, such a high priority pod can deprive other pods from getting scheduled.

func (*PriorityQueue) Pop

func (p *PriorityQueue) Pop() (*qjobv1.XQueueJob, error)

Pop removes the head of the active queue and returns it. It blocks if the activeQ is empty and waits until a new item is added to the queue. It also clears receivedMoveRequest to mark the beginning of a new scheduling cycle.

func (*PriorityQueue) Update

func (p *PriorityQueue) Update(oldQJ, newQJ *qjobv1.XQueueJob) error

Update updates a pod in the active queue if present. Otherwise, it removes the item from the unschedulable queue and adds the updated one to the active queue.

type SchedulingQueue

type SchedulingQueue interface {
	Add(qj *qjobv1.XQueueJob) error
	AddIfNotPresent(qj *qjobv1.XQueueJob) error
	AddUnschedulableIfNotPresent(qj *qjobv1.XQueueJob) error
	Pop() (*qjobv1.XQueueJob, error)
	Update(oldQJ, newQJ *qjobv1.XQueueJob) error
	Delete(QJ *qjobv1.XQueueJob) error
	MoveAllToActiveQueue()
}

SchedulingQueue is an interface for a queue to store pods waiting to be scheduled. The interface follows a pattern similar to cache.FIFO and cache.Heap and makes it easy to use those data structures as a SchedulingQueue.

func NewSchedulingQueue

func NewSchedulingQueue() SchedulingQueue

NewSchedulingQueue initializes a new scheduling queue. If pod priority is enabled a priority queue is returned. If it is disabled, a FIFO is returned.

type UnschedulableQJMap

type UnschedulableQJMap struct {
	// contains filtered or unexported fields
}

UnschedulablePodsMap holds pods that cannot be scheduled. This data structure is used to implement unschedulableQ.

func (*UnschedulableQJMap) Add

func (u *UnschedulableQJMap) Add(pod *qjobv1.XQueueJob)

Add adds a pod to the unschedulable pods.

func (*UnschedulableQJMap) Clear

func (u *UnschedulableQJMap) Clear()

Clear removes all the entries from the unschedulable maps.

func (*UnschedulableQJMap) Delete

func (u *UnschedulableQJMap) Delete(pod *qjobv1.XQueueJob)

Delete deletes a pod from the unschedulable pods.

func (*UnschedulableQJMap) Get

Get returns the pod if a pod with the same key as the key of the given "pod" is found in the map. It returns nil otherwise.

func (*UnschedulableQJMap) Update

func (u *UnschedulableQJMap) Update(pod *qjobv1.XQueueJob)

Update updates a pod in the unschedulable pods.

type UnschedulableQJs

type UnschedulableQJs interface {
	Add(p *qjobv1.XQueueJob)
	Delete(p *qjobv1.XQueueJob)
	Update(p *qjobv1.XQueueJob)
	Get(p *qjobv1.XQueueJob) *qjobv1.XQueueJob
	Clear()
}

UnschedulablePods is an interface for a queue that is used to keep unschedulable pods. These pods are not actively reevaluated for scheduling. They are moved to the active scheduling queue on certain events, such as termination of a pod in the cluster, addition of nodes, etc.

type UnschedulableQueueJobs

type UnschedulableQueueJobs interface {
	Add(pod *qjobv1.XQueueJob)
	Delete(pod *qjobv1.XQueueJob)
	Update(pod *qjobv1.XQueueJob)
	Get(pod *qjobv1.XQueueJob) *qjobv1.XQueueJob
	Clear()
}

type XController

type XController struct {
	// contains filtered or unexported fields
}

XController the XQueueJob Controller type

func NewXQueueJobController

func NewXQueueJobController(config *rest.Config, schedulerName string) *XController

NewXQueueJobController create new XQueueJob Controller

func (*XController) Cleanup

func (cc *XController) Cleanup(queuejob *arbv1.XQueueJob) error

Cleanup function

func (*XController) GetAggregatedResources

func (qjm *XController) GetAggregatedResources(cqj *arbv1.XQueueJob) *schedulerapi.Resource

func (*XController) GetQueueJobsEligibleForPreemption

func (qjm *XController) GetQueueJobsEligibleForPreemption() []*arbv1.XQueueJob

func (*XController) PreemptQueueJobs

func (qjm *XController) PreemptQueueJobs()

func (*XController) Run

func (cc *XController) Run(stopCh chan struct{})

Run start XQueueJob Controller

func (*XController) ScheduleNext

func (qjm *XController) ScheduleNext()

func (*XController) UpdateQueueJobs

func (qjm *XController) UpdateQueueJobs()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL