queue

package
v0.0.0-...-dcfb068 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2024 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Name is the name of the plugin used in the plugin registry and configurations.
	Name = "GodelSort"
)

Variables

Functions

func InitUnitQueueSortPlugin

func InitUnitQueueSortPlugin(spec *framework.PluginSpec, pluginArgs map[string]*config.PluginConfig) (framework.UnitQueueSortPlugin, error)

func MakeNextUnitFunc

func MakeNextUnitFunc(queue SchedulingQueue) func() *framework.QueuedUnitInfo

Types

type BlockQueue

type BlockQueue struct {
	// contains filtered or unexported fields
}

BlockQueue implements a scheduling queue. TODO(lintong.jiang): LOG - revisit the logging messages in this file

func NewBlockQueue

func NewBlockQueue(
	cache cache.SchedulerCache,
	pcLister schedulingv1.PriorityClassLister,
	pgLister v1alpha1.PodGroupLister,
	lessFn framework.UnitLessFunc,
	opts ...Option,
) *BlockQueue

NewBlockQueue creates a BlockQueue object.

func (*BlockQueue) ActivePodGroupUnit

func (p *BlockQueue) ActivePodGroupUnit(unitKey string)

func (*BlockQueue) Add

func (p *BlockQueue) Add(pod *v1.Pod) error

Add adds a pod to the ready queue. It should be called only when a new pod is added so there is no chance the pod is already in waiting/ready/unschedulable/backoff queues

func (*BlockQueue) AddUnschedulableIfNotPresent

func (p *BlockQueue) AddUnschedulableIfNotPresent(unitInfo *framework.QueuedUnitInfo, _ int64) error

AddUnschedulableIfNotPresent inserts a unit that cannot be scheduled into the queue, unless it is already in the queue. Normally, BlockQueue puts unschedulable units in `unschedulableQ`. But if there has been a recent move request, then the unit is put in `backoffQ`.

func (*BlockQueue) AssignedPodAdded

func (p *BlockQueue) AssignedPodAdded(pod *v1.Pod)

We removed the unschedulable-related logic from the BlockQueue, so nothing will be done here.

func (*BlockQueue) AssignedPodUpdated

func (p *BlockQueue) AssignedPodUpdated(pod *v1.Pod)

We removed the unschedulable-related logic from the BlockQueue, so nothing will be done here.

func (*BlockQueue) CanBeRecycle

func (p *BlockQueue) CanBeRecycle() bool

func (*BlockQueue) Close

func (p *BlockQueue) Close()

Close closes the priority queue.

func (*BlockQueue) Delete

func (p *BlockQueue) Delete(pod *v1.Pod) error

Delete deletes the item from either of the two queues. It assumes the pod is only in one queue.

func (*BlockQueue) MoveAllToActiveOrBackoffQueue

func (p *BlockQueue) MoveAllToActiveOrBackoffQueue(event string)

We removed the unschedulable-related logic from the BlockQueue, so nothing will be done here.

func (*BlockQueue) NumUnschedulableUnits

func (p *BlockQueue) NumUnschedulableUnits() int

NumUnschedulableUnits returns the number of unschedulable pods exist in the SchedulingQueue. In BlockQueue we removed the Unschedulable-related logic, so it always returns 0.

func (*BlockQueue) Peek

func (p *BlockQueue) Peek() *framework.QueuedUnitInfo

func (*BlockQueue) PendingPods

func (p *BlockQueue) PendingPods() []*v1.Pod

PendingPods returns all the pending pods in the queue. This function is used for debugging purposes in the scheduler cache dumper and comparer.

func (*BlockQueue) Pop

PopUnit pops a unit for batch scheduling.

func (*BlockQueue) Run

func (p *BlockQueue) Run()

Run starts the goroutine to pump from podBackoffQ to activeQ

func (*BlockQueue) SchedulingCycle

func (p *BlockQueue) SchedulingCycle() int64

SchedulingCycle returns current scheduling cycle.

func (*BlockQueue) Update

func (p *BlockQueue) Update(oldPod, newPod *v1.Pod) error

Update updates a pod in the ready/waiting/backoff queue if present. Otherwise, it removes the item from the unschedulable queue if pod is updated in a way that it may become schedulable and adds the updated one to the ready/waiting queue. If pod is not present in any of the queues, it is added to the waiting queue.

type Option

type Option func(*schedulingQueueOptions)

Option configures a PriorityQueue

func WithAttemptImpactFactorOnPriority

func WithAttemptImpactFactorOnPriority(attemptImpactFactorOnPriority float64) Option

WithAttemptImpactFactorOnPriority sets attemptImpactFactorOnPriority for PriorityQueue.

func WithClock

func WithClock(clock util.Clock) Option

WithClock sets clock for PriorityQueue, the default clock is util.RealClock.

func WithOwner

func WithOwner(owner string) Option

WithOwner sets owner name for PriorityQueue.

func WithPodMaxBackoffDuration

func WithPodMaxBackoffDuration(duration time.Duration) Option

WithPodMaxBackoffDuration sets unit max backoff duration for PriorityQueue.

func WithSubCluster

func WithSubCluster(subCluster string) Option

WithSubCluster sets SubCluster for PriorityQueue.

func WithSwitchType

func WithSwitchType(switchType framework.SwitchType) Option

WithSwitchType sets SwitchType for PriorityQueue.

func WithUnitInitialBackoffDuration

func WithUnitInitialBackoffDuration(duration time.Duration) Option

WithUnitInitialBackoffDuration sets unit initial backoff duration for PriorityQueue.

type PriorityQueue

type PriorityQueue struct {
	// contains filtered or unexported fields
}

PriorityQueue implements a scheduling queue. The head of PriorityQueue is the highest priority pending pod. This structure has three sub queues. One sub-queue holds pods that are being considered for scheduling. This is called activeQ and is a Heap. Another queue holds pods that are already tried and are determined to be unschedulable. The latter is called unschedulableQ. The third queue holds pods that are moved from unschedulable queues and will be moved to active queue when backoff are completed. TODO(lintong.jiang): LOG - revisit the logging messages in this file

func NewPriorityQueue

func NewPriorityQueue(
	cache cache.SchedulerCache,
	pcLister schedulingv1.PriorityClassLister,
	pgLister v1alpha1.PodGroupLister,
	lessFn framework.UnitLessFunc,
	opts ...Option,
) *PriorityQueue

NewPriorityQueue creates a PriorityQueue object.

func (*PriorityQueue) ActivePodGroupUnit

func (p *PriorityQueue) ActivePodGroupUnit(unitKey string)

func (*PriorityQueue) Add

func (p *PriorityQueue) Add(pod *v1.Pod) error

Add adds a pod to the ready queue. It should be called only when a new pod is added so there is no chance the pod is already in waiting/ready/unschedulable/backoff queues

func (*PriorityQueue) AddUnschedulableIfNotPresent

func (p *PriorityQueue) AddUnschedulableIfNotPresent(unitInfo *framework.QueuedUnitInfo, unitSchedulingCycle int64) error

AddUnschedulableIfNotPresent inserts a unit that cannot be scheduled into the queue, unless it is already in the queue. Normally, PriorityQueue puts unschedulable units in `unschedulableQ`. But if there has been a recent move request, then the unit is put in `backoffQ`.

func (*PriorityQueue) AssignedPodAdded

func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod)

AssignedPodAdded is called when a bound pod is added. Creation of this pod may make pending pods with matching affinity terms schedulable.

func (*PriorityQueue) AssignedPodUpdated

func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod)

AssignedPodUpdated is called when a bound pod is updated. Change of labels may make pending pods with matching affinity terms schedulable.

func (*PriorityQueue) CanBeRecycle

func (p *PriorityQueue) CanBeRecycle() bool

func (*PriorityQueue) Close

func (p *PriorityQueue) Close()

Close closes the priority queue.

func (*PriorityQueue) Delete

func (p *PriorityQueue) Delete(pod *v1.Pod) error

Delete deletes the item from either of the two queues. It assumes the pod is only in one queue.

func (*PriorityQueue) MoveAllToActiveOrBackoffQueue

func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(event string)

MoveAllToActiveOrBackoffQueue moves all pods from unschedulableQ to activeQ or backoffQ. This function adds all pods and then signals the condition variable to ensure that if Pop() is waiting for an item, it receives it after all the pods are in the queue and the head is the highest priority pod.

func (*PriorityQueue) NumUnschedulableUnits

func (p *PriorityQueue) NumUnschedulableUnits() int

NumUnschedulableUnits returns the number of unschedulable pods exist in the SchedulingQueue.

func (*PriorityQueue) Peek

func (*PriorityQueue) PendingPods

func (p *PriorityQueue) PendingPods() []*v1.Pod

PendingPods returns all the pending pods in the queue. This function is used for debugging purposes in the scheduler cache dumper and comparer.

func (*PriorityQueue) Pop

PopUnit pops a unit for batch scheduling.

func (*PriorityQueue) Run

func (p *PriorityQueue) Run()

Run starts the goroutine to pump from podBackoffQ to activeQ

func (*PriorityQueue) SchedulingCycle

func (p *PriorityQueue) SchedulingCycle() int64

SchedulingCycle returns current scheduling cycle.

func (*PriorityQueue) Update

func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error

Update updates a pod in the ready/waiting/backoff queue if present. Otherwise, it removes the item from the unschedulable queue if pod is updated in a way that it may become schedulable and adds the updated one to the ready/waiting queue. If pod is not present in any of the queues, it is added to the waiting queue.

type SchedulingQueue

type SchedulingQueue interface {
	// Run starts the goroutines managing the queue.
	Run()
	// Close closes the SchedulingQueue so that the goroutine which is
	// waiting to pop items can exit gracefully.
	Close()
	CanBeRecycle() bool

	Add(pod *v1.Pod) error
	// AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue.
	// The unitSchedulingCycle represents the current scheduling cycle number which can be
	// returned by calling SchedulingCycle().
	AddUnschedulableIfNotPresent(unitInfo *framework.QueuedUnitInfo, unitSchedulingCycle int64) error
	Update(oldPod, newPod *v1.Pod) error
	Delete(pod *v1.Pod) error
	AssignedPodAdded(pod *v1.Pod)
	AssignedPodUpdated(pod *v1.Pod)
	MoveAllToActiveOrBackoffQueue(event string)
	ActivePodGroupUnit(unitKey string)

	Pop() (*framework.QueuedUnitInfo, error)
	Peek() *framework.QueuedUnitInfo

	PendingPods() []*v1.Pod
	// NumUnschedulableUnits returns the number of unschedulable pods exist in the SchedulingQueue.
	NumUnschedulableUnits() int
	// SchedulingCycle returns the current number of scheduling cycle which is
	// cached by scheduling queue. Normally, incrementing this number whenever
	// a pod is popped (e.g. called Pop()) is enough.
	SchedulingCycle() int64
}

SchedulingQueue is an interface for a queue to store pods waiting to be scheduled. The interface follows a pattern similar to cache.FIFO and cache.Heap and makes it easy to use those data structures as a SchedulingQueue.

func NewSchedulingQueue

func NewSchedulingQueue(
	cache cache.SchedulerCache,
	pcLister schedulingv1.PriorityClassLister,
	pgLister v1alpha1.PodGroupLister,
	lessFn framework.UnitLessFunc,
	useBlockQueue bool,
	opts ...Option,
) SchedulingQueue

NewSchedulingQueue initializes a priority queue as a new scheduling queue.

type SortPluginFactory

type SortPluginFactory = func(runtime.Object) (framework.UnitQueueSortPlugin, error)

type SubQueue

type SubQueue interface {
	Add(interface{}) error
	Update(interface{}, interface{}) error
	Delete(interface{}) error
	DeleteByKey(string) error
	GetByKey(string) (interface{}, bool, error)

	Peek() interface{}
	Pop() (interface{}, error)
	Process(f heap.ProcessFunc)

	String() string
	Len() int
	List() []interface{}
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL