jobcontroller

package
v0.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 29, 2020 License: Apache-2.0 Imports: 34 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// SHAREPODNEEDSYNC is for high priority SharePod sync key used in common/jobcontroller/pod.go
	SHAREPODNEEDSYNC = "LSALAB-NTHU-SHAREPOD-NEED-SYNC"
)

Variables

This section is empty.

Functions

func GenExpectationPodsKey

func GenExpectationPodsKey(jobKey, replicaType string) string

func GenExpectationServicesKey

func GenExpectationServicesKey(jobKey, replicaType string) string

func GenGeneralName

func GenGeneralName(jobName, rtype, index string) string

func RecheckDeletionTimestamp

func RecheckDeletionTimestamp(getObject func() (metav1.Object, error)) func() error

RecheckDeletionTimestamp returns a CanAdopt() function to recheck deletion.

The CanAdopt() function calls getObject() to fetch the latest value, and denies adoption attempts if that object has a non-nil DeletionTimestamp.

Types

type ControllerInterface

type ControllerInterface interface {
	// Returns the Controller name
	ControllerName() string

	// Returns the GroupVersionKind of the API
	GetAPIGroupVersionKind() schema.GroupVersionKind

	// Returns the GroupVersion of the API
	GetAPIGroupVersion() schema.GroupVersion

	// Returns the Group Name(key) in the labels of the job
	GetGroupNameLabelKey() string

	// Returns the Job Name(key) in the labels of the job
	GetJobNameLabelKey() string

	// Returns the Group Name(value) in the labels of the job
	GetGroupNameLabelValue() string

	// Returns the Replica Type(key) in the labels of the job
	GetReplicaTypeLabelKey() string

	// Returns the Replica Index(value) in the labels of the job
	GetReplicaIndexLabelKey() string

	// Returns the Job Role(key) in the labels of the job
	GetJobRoleKey() string

	// Returns the Job from Informer Cache
	GetJobFromInformerCache(namespace, name string) (metav1.Object, error)

	// Returns the Job from API server
	GetJobFromAPIClient(namespace, name string) (metav1.Object, error)
}

Common Interface to be implemented by all operators.

type JobController

type JobController struct {
	Controller ControllerInterface

	Config JobControllerConfiguration

	// podControl is used to add or delete pods.
	PodControl controller.PodControlInterface

	// serviceControl is used to add or delete services.
	ServiceControl control.ServiceControlInterface

	// kubeClientSet is a standard kubernetes clientset.
	KubeClientSet kubeclientset.Interface

	//KubeBatchClientSet is a standard kube-batch clientset.
	KubeBatchClientSet kubebatchclient.Interface

	// podLister can list/get pods from the shared informer's store.
	PodLister corelisters.PodLister

	// serviceLister can list/get services from the shared informer's store.
	ServiceLister corelisters.ServiceLister

	// podInformerSynced returns true if the pod store has been synced at least once.
	PodInformerSynced cache.InformerSynced

	// serviceInformerSynced returns true if the service store has been synced at least once.
	ServiceInformerSynced cache.InformerSynced

	// A TTLCache of pod/services creates/deletes each job expects to see
	// We use Job namespace/name + ReplicaType + pods/services as an expectation key,
	// For example, there is a TFJob with namespace "tf-operator" and name "tfjob-abc":
	// {
	//     "PS": {
	//         "Replicas": 2,
	//     },
	//     "Worker": {
	//         "Replicas": 4,
	//     }
	// }
	// We will create 4 expectations:
	// - "tf-operator/tfjob-abc/ps/services", expects 2 adds.
	// - "tf-operator/tfjob-abc/ps/pods", expects 2 adds.
	// - "tf-operator/tfjob-abc/worker/services", expects 4 adds.
	// - "tf-operator/tfjob-abc/worker/pods", expects 4 adds.
	Expectations controller.ControllerExpectationsInterface

	// workQueue is a rate limited work queue. This is used to queue work to be
	// processed instead of performing it as soon as a change happens. This
	// means we can ensure we only process a fixed amount of resources at a
	// time, and makes it easy to ensure we are never processing the same item
	// simultaneously in two different workers.
	WorkQueue workqueue.RateLimitingInterface

	// recorder is an event recorder for recording Event resources to the
	// Kubernetes API.
	Recorder record.EventRecorder

	Option *options.ServerOption

	SharePodClientSet kubeshareclientset.Interface

	SharePodLister kubesharelisters.SharePodLister

	SharePodInformerSynced cache.InformerSynced

	HighPrioritySharePodsQueue      *[]*kubesharev1.SharePod
	HighPrioritySharePodsQueueMutex *sync.Mutex
}

JobController abstracts other operators to manage the lifecycle of Jobs.

func NewJobController

func NewJobController(
	controllerImpl ControllerInterface,
	reconcilerSyncPeriod metav1.Duration,
	enableGangScheduling bool,
	kubeClientSet kubeclientset.Interface,
	kubeshareClientSet kubeshareclientset.Interface,
	kubeBatchClientSet kubebatchclient.Interface,
	kubeInformerFactory kubeinformers.SharedInformerFactory,
	option *options.ServerOption,
	workQueueName string) JobController

func (*JobController) AddPod

func (jc *JobController) AddPod(obj interface{})

When a pod is created, enqueue the job that manages it and update its expectations.

func (*JobController) AddService

func (jc *JobController) AddService(obj interface{})

When a service is created, enqueue the controller that manages it and update its expectations.

func (*JobController) AddSharePod

func (jc *JobController) AddSharePod(obj interface{})

AddSharePod is SAME as AddPod

func (*JobController) DeletePdb

func (jc *JobController) DeletePdb(job metav1.Object) error

func (*JobController) DeletePod

func (jc *JobController) DeletePod(obj interface{})

When a pod is deleted, enqueue the job that manages the pod and update its expectations. obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.

func (*JobController) DeletePodGroup

func (jc *JobController) DeletePodGroup(object runtime.Object) error

func (*JobController) DeleteService

func (jc *JobController) DeleteService(obj interface{})

When a service is deleted, enqueue the job that manages the service and update its expectations. obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item.

func (*JobController) DeleteSharePod

func (jc *JobController) DeleteSharePod(obj interface{})

DeleteSharePod is SAME as DeletePod

func (*JobController) FilterPodsForReplicaType

func (jc *JobController) FilterPodsForReplicaType(pods []*v1.Pod, replicaType string) ([]*v1.Pod, error)

FilterPodsForReplicaType returns pods belong to a replicaType.

func (*JobController) FilterServicesForReplicaType

func (jc *JobController) FilterServicesForReplicaType(services []*v1.Service, replicaType string) ([]*v1.Service, error)

FilterServicesForReplicaType returns service belong to a replicaType.

func (*JobController) FilterSharePodsForReplicaType

func (jc *JobController) FilterSharePodsForReplicaType(pods []*kubesharev1.SharePod, replicaType string) ([]*kubesharev1.SharePod, error)

FilterSharePodsForReplicaType is SAME as FilterPodsForReplicaType

func (*JobController) GenLabels

func (jc *JobController) GenLabels(jobName string) map[string]string

func (*JobController) GenOwnerReference

func (jc *JobController) GenOwnerReference(obj metav1.Object) *metav1.OwnerReference

func (*JobController) GetPodSlices

func (jc *JobController) GetPodSlices(pods []*v1.Pod, replicas int, logger *log.Entry) map[string][]*v1.Pod

GetPodSlices returns a slice, which element is the slice of pod.

func (*JobController) GetPodsForJob

func (jc *JobController) GetPodsForJob(job metav1.Object) ([]*v1.Pod, error)

GetPodsForJob returns the set of pods that this job should manage. It also reconciles ControllerRef by adopting/orphaning. Note that the returned Pods are pointers into the cache.

func (*JobController) GetServiceSlices

func (jc *JobController) GetServiceSlices(services []*v1.Service, replicas int, logger *log.Entry) [][]*v1.Service

getServiceSlices returns a slice, which element is the slice of service. Assume the return object is serviceSlices, then serviceSlices[i] is an array of pointers to services corresponding to Services for replica i.

func (*JobController) GetServicesForJob

func (jc *JobController) GetServicesForJob(job metav1.Object) ([]*v1.Service, error)

getServicesForJob returns the set of services that this job should manage. It also reconciles ControllerRef by adopting/orphaning. Note that the returned services are pointers into the cache.

func (*JobController) GetSharePodSlices

func (jc *JobController) GetSharePodSlices(pods []*kubesharev1.SharePod, replicas int, logger *log.Entry) map[string][]*kubesharev1.SharePod

GetSharePodSlices is SAME as GetPodSlices

func (*JobController) GetSharePodsForJob

func (jc *JobController) GetSharePodsForJob(job metav1.Object) ([]*kubesharev1.SharePod, error)

GetSharePodsForJob is SAME as GetPodsForJob

func (*JobController) SyncPdb

func (jc *JobController) SyncPdb(job metav1.Object, minAvailableReplicas int32) (*v1beta1.PodDisruptionBudget, error)

SyncPdb will create a PDB for gang scheduling by kube-arbitrator.

func (*JobController) SyncPodGroup

func (jc *JobController) SyncPodGroup(job metav1.Object, minAvailableReplicas int32) (*v1alpha1.PodGroup, error)

func (*JobController) UpdatePod

func (jc *JobController) UpdatePod(old, cur interface{})

When a pod is updated, figure out what tfjob/s manage it and wake them up. If the labels of the pod have changed we need to awaken both the old and new replica set. old and cur must be *v1.Pod types.

func (*JobController) UpdateService

func (jc *JobController) UpdateService(old, cur interface{})

When a service is updated, figure out what job/s manage it and wake them up. If the labels of the service have changed we need to awaken both the old and new replica set. old and cur must be *v1.Service types.

func (*JobController) UpdateSharePod

func (jc *JobController) UpdateSharePod(old, cur interface{})

UpdateSharePod is SAME as UpdatePod

type JobControllerConfiguration

type JobControllerConfiguration struct {
	// ReconcilerSyncLoopPeriod is the amount of time the reconciler sync states loop
	// wait between two reconciler sync.
	// It is set to 15 sec by default.
	// TODO(cph): maybe we can let it grows by multiple in the future
	// and up to 5 minutes to reduce idle loop.
	// e.g. 15s, 30s, 60s, 120s...
	ReconcilerSyncLoopPeriod metav1.Duration

	// Enable gang scheduling by kube-arbitrator
	EnableGangScheduling bool
}

JobControllerConfiguration contains configuration of operator.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL