Package node contains code for syncing cloud instances with node registry



    View Source
    const (
    	NodeControllerSubsystem = "node_collector"
    	ZoneHealthStatisticKey  = "zone_health"
    	ZoneSizeKey             = "zone_size"
    	ZoneNoUnhealthyNodesKey = "unhealty_nodes_in_zone"
    	EvictionsNumberKey      = "evictions_number"


    View Source
    var (
    	ZoneHealth = prometheus.NewGaugeVec(
    			Subsystem: NodeControllerSubsystem,
    			Name:      ZoneHealthStatisticKey,
    			Help:      "Gauge measuring percentage of healty nodes per zone.",
    	ZoneSize = prometheus.NewGaugeVec(
    			Subsystem: NodeControllerSubsystem,
    			Name:      ZoneSizeKey,
    			Help:      "Gauge measuring number of registered Nodes per zones.",
    	UnhealthyNodes = prometheus.NewGaugeVec(
    			Subsystem: NodeControllerSubsystem,
    			Name:      ZoneNoUnhealthyNodesKey,
    			Help:      "Gauge measuring number of not Ready Nodes per zones.",
    	EvictionsNumber = prometheus.NewCounterVec(
    			Subsystem: NodeControllerSubsystem,
    			Name:      EvictionsNumberKey,
    			Help:      "Number of Node evictions that happened since current instance of NodeController started.",
    View Source
    var (
    	ErrCloudInstance = errors.New("cloud provider doesn't support instances.")
    	UnreachableTaintTemplate = &v1.Taint{
    		Key:    metav1.TaintNodeUnreachable,
    		Effect: v1.TaintEffectNoExecute,
    	NotReadyTaintTemplate = &v1.Taint{
    		Key:    metav1.TaintNodeNotReady,
    		Effect: v1.TaintEffectNoExecute,


    func Register

    func Register()


    type ActionFunc

    type ActionFunc func(TimedValue) (bool, time.Duration)

      ActionFunc takes a timed value and returns false if the item must be retried, with an optional time.Duration if some minimum wait interval should be used.

      type CIDRAllocator

      type CIDRAllocator interface {
      	AllocateOrOccupyCIDR(node *v1.Node) error
      	ReleaseCIDR(node *v1.Node) error

        CIDRAllocator is an interface implemented by things that know how to allocate/occupy/recycle CIDR for nodes.

        func NewCIDRRangeAllocator

        func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, serviceCIDR *net.IPNet, subNetMaskSize int, nodeList *v1.NodeList) (CIDRAllocator, error)

          NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDR for node Caller must ensure subNetMaskSize is not less than cluster CIDR mask size. Caller must always pass in a list of existing nodes so the new allocator can initialize its CIDR map. NodeList is only nil in testing.

          type NoExecuteTaintManager

          type NoExecuteTaintManager struct {
          	// contains filtered or unexported fields

            NoExecuteTaintManager listens to Taint/Toleration changes and is resposible for removing Pods from Nodes tainted with NoExecute Taints.

            func NewNoExecuteTaintManager

            func NewNoExecuteTaintManager(c clientset.Interface) *NoExecuteTaintManager

              NewNoExecuteTaintManager creates a new NoExecuteTaintManager that will use passed clientset to communicate with the API server.

              func (*NoExecuteTaintManager) NodeUpdated

              func (tc *NoExecuteTaintManager) NodeUpdated(oldNode *v1.Node, newNode *v1.Node)

                NodeUpdated is used to notify NoExecuteTaintManager about Node changes.

                func (*NoExecuteTaintManager) PodUpdated

                func (tc *NoExecuteTaintManager) PodUpdated(oldPod *v1.Pod, newPod *v1.Pod)

                  PodUpdated is used to notify NoExecuteTaintManager about Pod changes.

                  func (*NoExecuteTaintManager) Run

                  func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{})

                    Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.

                    type NodeController

                    type NodeController struct {
                    	// contains filtered or unexported fields

                    func NewNodeController

                    func NewNodeController(
                    	podInformer coreinformers.PodInformer,
                    	nodeInformer coreinformers.NodeInformer,
                    	daemonSetInformer extensionsinformers.DaemonSetInformer,
                    	cloud cloudprovider.Interface,
                    	kubeClient clientset.Interface,
                    	podEvictionTimeout time.Duration,
                    	evictionLimiterQPS float32,
                    	secondaryEvictionLimiterQPS float32,
                    	largeClusterThreshold int32,
                    	unhealthyZoneThreshold float32,
                    	nodeMonitorGracePeriod time.Duration,
                    	nodeStartupGracePeriod time.Duration,
                    	nodeMonitorPeriod time.Duration,
                    	clusterCIDR *net.IPNet,
                    	serviceCIDR *net.IPNet,
                    	nodeCIDRMaskSize int,
                    	allocateNodeCIDRs bool,
                    	runTaintManager bool,
                    	useTaintBasedEvictions bool) (*NodeController, error)

                      NewNodeController returns a new node controller to sync instances from cloudprovider. This method returns an error if it is unable to initialize the CIDR bitmap with podCIDRs it has already allocated to nodes. Since we don't allow podCIDR changes currently, this should be handled as a fatal error.

                      func (*NodeController) ComputeZoneState

                      func (nc *NodeController) ComputeZoneState(nodeReadyConditions []*v1.NodeCondition) (int, zoneState)

                        This function is expected to get a slice of NodeReadyConditions for all Nodes in a given zone. The zone is considered: - fullyDisrupted if there're no Ready Nodes, - partiallyDisrupted if at least than nc.unhealthyZoneThreshold percent of Nodes are not Ready, - normal otherwise

                        func (*NodeController) HealthyQPSFunc

                        func (nc *NodeController) HealthyQPSFunc(nodeNum int) float32

                          Default value for cluster eviction rate - we take nodeNum for consistency with ReducedQPSFunc.

                          func (*NodeController) ReducedQPSFunc

                          func (nc *NodeController) ReducedQPSFunc(nodeNum int) float32

                            If the cluster is large make evictions slower, if they're small stop evictions altogether.

                            func (*NodeController) Run

                            func (nc *NodeController) Run()

                              Run starts an asynchronous loop that monitors the status of cluster nodes.

                              type RateLimitedTimedQueue

                              type RateLimitedTimedQueue struct {
                              	// contains filtered or unexported fields

                                RateLimitedTimedQueue is a unique item priority queue ordered by the expected next time of execution. It is also rate limited.

                                func NewRateLimitedTimedQueue

                                func NewRateLimitedTimedQueue(limiter flowcontrol.RateLimiter) *RateLimitedTimedQueue

                                  Creates new queue which will use given RateLimiter to oversee execution.

                                  func (*RateLimitedTimedQueue) Add

                                  func (q *RateLimitedTimedQueue) Add(value string, uid interface{}) bool

                                    Adds value to the queue to be processed. Won't add the same value(comparsion by value) a second time if it was already added and not removed.

                                    func (*RateLimitedTimedQueue) Clear

                                    func (q *RateLimitedTimedQueue) Clear()

                                      Removes all items from the queue

                                      func (*RateLimitedTimedQueue) Remove

                                      func (q *RateLimitedTimedQueue) Remove(value string) bool

                                        Removes Node from the Evictor. The Node won't be processed until added again.

                                        func (*RateLimitedTimedQueue) SwapLimiter

                                        func (q *RateLimitedTimedQueue) SwapLimiter(newQPS float32)

                                          SwapLimiter safely swaps current limiter for this queue with the passed one if capacities or qps's differ.

                                          func (*RateLimitedTimedQueue) Try

                                          func (q *RateLimitedTimedQueue) Try(fn ActionFunc)

                                            Try processes the queue. Ends prematurely if RateLimiter forbids an action and leak is true. Otherwise, requeues the item to be processed. Each value is processed once if fn returns true, otherwise it is added back to the queue. The returned remaining is used to identify the minimum time to execute the next item in the queue. The same value is processed only once unless Remove is explicitly called on it (it's done by the cancelPodEviction function in NodeController when Node becomes Ready again) TODO: figure out a good way to do garbage collection for all Nodes that were removed from the cluster.

                                            type TimedQueue

                                            type TimedQueue []*TimedValue

                                              TimedQueue is a priority heap where the lowest ProcessAt is at the front of the queue

                                              func (TimedQueue) Len

                                              func (h TimedQueue) Len() int

                                              func (TimedQueue) Less

                                              func (h TimedQueue) Less(i, j int) bool

                                              func (*TimedQueue) Pop

                                              func (h *TimedQueue) Pop() interface{}

                                              func (*TimedQueue) Push

                                              func (h *TimedQueue) Push(x interface{})

                                              func (TimedQueue) Swap

                                              func (h TimedQueue) Swap(i, j int)

                                              type TimedValue

                                              type TimedValue struct {
                                              	Value string
                                              	// UID could be anything that helps identify the value
                                              	UID       interface{}
                                              	AddedAt   time.Time
                                              	ProcessAt time.Time

                                                TimedValue is a value that should be processed at a designated time.

                                                type TimedWorker

                                                type TimedWorker struct {
                                                	WorkItem  *WorkArgs
                                                	CreatedAt time.Time
                                                	FireAt    time.Time
                                                	Timer     *time.Timer

                                                  TimedWorker is a responsible for executing a function no earlier than at FireAt time.

                                                  func CreateWorker

                                                  func CreateWorker(args *WorkArgs, createdAt time.Time, fireAt time.Time, f func(args *WorkArgs) error) *TimedWorker

                                                    CreateWorker creates a TimedWorker that will execute `f` not earlier than `fireAt`.

                                                    func (*TimedWorker) Cancel

                                                    func (w *TimedWorker) Cancel()

                                                      Cancel cancels the execution of function by the `TimedWorker`

                                                      type TimedWorkerQueue

                                                      type TimedWorkerQueue struct {
                                                      	// contains filtered or unexported fields

                                                        TimedWorkerQueue keeps a set of TimedWorkers that are still wait for execution.

                                                        func CreateWorkerQueue

                                                        func CreateWorkerQueue(f func(args *WorkArgs) error) *TimedWorkerQueue

                                                          CreateWorkerQueue creates a new TimedWorkerQueue for workers that will execute given function `f`.

                                                          func (*TimedWorkerQueue) AddWork

                                                          func (q *TimedWorkerQueue) AddWork(args *WorkArgs, createdAt time.Time, fireAt time.Time)

                                                            AddWork adds a work to the WorkerQueue which will be executed not earlier than `fireAt`.

                                                            func (*TimedWorkerQueue) CancelWork

                                                            func (q *TimedWorkerQueue) CancelWork(key string) bool

                                                              CancelWork removes scheduled function execution from the queue. Returns true if work was cancelled.

                                                              func (*TimedWorkerQueue) GetWorkerUnsafe

                                                              func (q *TimedWorkerQueue) GetWorkerUnsafe(key string) *TimedWorker

                                                                GetWorkerUnsafe returns a TimedWorker corresponding to the given key. Unsafe method - workers have attached goroutines which can fire afater this function is called.

                                                                type UniqueQueue

                                                                type UniqueQueue struct {
                                                                	// contains filtered or unexported fields

                                                                  A FIFO queue which additionally guarantees that any element can be added only once until it is removed.

                                                                  func (*UniqueQueue) Add

                                                                  func (q *UniqueQueue) Add(value TimedValue) bool

                                                                    Adds a new value to the queue if it wasn't added before, or was explicitly removed by the Remove call. Returns true if new value was added.

                                                                    func (*UniqueQueue) Clear

                                                                    func (q *UniqueQueue) Clear()

                                                                      Clear removes all items from the queue and duplication preventing set.

                                                                      func (*UniqueQueue) Get

                                                                      func (q *UniqueQueue) Get() (TimedValue, bool)

                                                                        Returns the oldest added value that wasn't returned yet.

                                                                        func (*UniqueQueue) Head

                                                                        func (q *UniqueQueue) Head() (TimedValue, bool)

                                                                          Head returns the oldest added value that wasn't returned yet without removing it.

                                                                          func (*UniqueQueue) Remove

                                                                          func (q *UniqueQueue) Remove(value string) bool

                                                                            Removes the value from the queue, so Get() call won't return it, and allow subsequent addition of the given value. If the value is not present does nothing and returns false.

                                                                            func (*UniqueQueue) RemoveFromQueue

                                                                            func (q *UniqueQueue) RemoveFromQueue(value string) bool

                                                                              Removes the value from the queue, but keeps it in the set, so it won't be added second time. Returns true if something was removed.

                                                                              func (*UniqueQueue) Replace

                                                                              func (q *UniqueQueue) Replace(value TimedValue) bool

                                                                                Replace replaces an existing value in the queue if it already exists, otherwise it does nothing. Returns true if the item was found.

                                                                                type WorkArgs

                                                                                type WorkArgs struct {
                                                                                	NamespacedName types.NamespacedName

                                                                                  WorkArgs keeps arguments that will be passed to the function executed by the worker.

                                                                                  func NewWorkArgs

                                                                                  func NewWorkArgs(name, namespace string) *WorkArgs

                                                                                    NewWorkArgs is a helper function to create new `WorkArgs`

                                                                                    func (*WorkArgs) KeyFromWorkArgs

                                                                                    func (w *WorkArgs) KeyFromWorkArgs() string

                                                                                      KeyFromWorkArgs creates a key for the given `WorkArgs`


                                                                                      Path Synopsis