// Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately. func(sched *Scheduler)Run() { if !sched.config.WaitForCacheSync() { return }
go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything) }
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting. func(sched *Scheduler)scheduleOne() { pod := sched.config.NextPod() if pod.DeletionTimestamp != nil { sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) glog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) return }
glog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)
// Synchronously attempt to find a fit for the pod. start := time.Now() suggestedHost, err := sched.schedule(pod) if err != nil { // schedule() may have failed because the pod would not fit on any host, so we try to // preempt, with the expectation that the next time the pod is tried for scheduling it // will fit due to the preemption. It is also possible that a different pod will schedule // into the resources that were preempted, but this is harmless. if fitError, ok := err.(*core.FitError); ok { preemptionStartTime := time.Now() sched.preempt(pod, fitError) metrics.PreemptionAttempts.Inc() metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime)) metrics.SchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime)) } return } metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start)) // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet. // This allows us to keep scheduling without waiting on binding to occur. assumedPod := pod.DeepCopy()
// Assume volumes first before assuming the pod. // // If all volumes are completely bound, then allBound is true and binding will be skipped. // // Otherwise, binding of volumes is started after the pod is assumed, but before pod binding. // // This function modifies 'assumedPod' if volume binding is required. allBound, err := sched.assumeVolumes(assumedPod, suggestedHost) if err != nil { return }
// assume modifies `assumedPod` by setting NodeName=suggestedHost err = sched.assume(assumedPod, suggestedHost) if err != nil { return } // bind the pod to its host asynchronously (we can do this b/c of the assumption step above). gofunc() { // Bind volumes first before Pod if !allBound { err = sched.bindVolumes(assumedPod) if err != nil { return } }
func(c *configFactory)getNextPod() *v1.Pod { pod, err := c.podQueue.Pop() if err == nil { glog.V(4).Infof("About to try and schedule pod %v/%v", pod.Namespace, pod.Name) return pod } glog.Errorf("Error while retrieving next pod from scheduling queue: %v", err) returnnil }
4. Scheduler.schedule
此部分代码位于pkg/scheduler/scheduler.go
此部分为调度逻辑的核心,通过不同的算法为具体的pod选择一个最合适的节点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Synchronously attempt to find a fit for the pod. start := time.Now() suggestedHost, err := sched.schedule(pod) if err != nil { // schedule() may have failed because the pod would not fit on any host, so we try to // preempt, with the expectation that the next time the pod is tried for scheduling it // will fit due to the preemption. It is also possible that a different pod will schedule // into the resources that were preempted, but this is harmless. if fitError, ok := err.(*core.FitError); ok { preemptionStartTime := time.Now() sched.preempt(pod, fitError) metrics.PreemptionAttempts.Inc() metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime)) metrics.SchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime)) } return }
// ScheduleAlgorithm is an interface implemented by things that know how to schedule pods // onto machines. type ScheduleAlgorithm interface { Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error) // Preempt receives scheduling errors for a pod and tries to create room for // the pod by preempting lower priority pods if possible. // It returns the node where preemption happened, a list of preempted pods, a // list of pods whose nominated node name should be removed, and error if any. Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error) // Predicates() returns a pointer to a map of predicate functions. This is // exposed for testing. Predicates() map[string]FitPredicate // Prioritizers returns a slice of priority config. This is exposed for // testing. Prioritizers() []PriorityConfig }
// Schedule tries to schedule the given pod to one of the nodes in the node list. // If it succeeds, it will return the name of the node. // If it fails, it will return a FitError error with reasons. func(g *genericScheduler)Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister)(string, error) { trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name)) defer trace.LogIfLong(100 * time.Millisecond)
trace.Step("Prioritizing") startPriorityEvalTime := time.Now() // When only one node after predicate, just use it. iflen(filteredNodes) == 1 { metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime)) return filteredNodes[0].Name, nil }
// podPassesBasicChecks makes sanity checks on the pod if it can be scheduled. funcpodPassesBasicChecks(pod *v1.Pod, pvcLister corelisters.PersistentVolumeClaimLister)error { // Check PVCs used by the pod namespace := pod.Namespace manifest := &(pod.Spec) for i := range manifest.Volumes { volume := &manifest.Volumes[i] if volume.PersistentVolumeClaim == nil { // Volume is not a PVC, ignore continue } pvcName := volume.PersistentVolumeClaim.ClaimName pvc, err := pvcLister.PersistentVolumeClaims(namespace).Get(pvcName) if err != nil { // The error has already enough context ("persistentvolumeclaim "myclaim" not found") return err }
if pvc.DeletionTimestamp != nil { return fmt.Errorf("persistentvolumeclaim %q is being deleted", pvc.Name) } }
// selectHost takes a prioritized list of nodes and then picks one // in a round-robin manner from the nodes that had the highest score. func(g *genericScheduler)selectHost(priorityList schedulerapi.HostPriorityList)(string, error) { iflen(priorityList) == 0 { return"", fmt.Errorf("empty priorityList") }
maxScores := findMaxScores(priorityList) ix := int(g.lastNodeIndex % uint64(len(maxScores))) g.lastNodeIndex++
return priorityList[maxScores[ix]].Host, nil }
5.4.1. findMaxScores
findMaxScores返回priorityList中具有最高Score的节点的索引。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// findMaxScores returns the indexes of nodes in the "priorityList" that has the highest "Score". funcfindMaxScores(priorityList schedulerapi.HostPriorityList) []int { maxScoreIndexes := make([]int, 0, len(priorityList)/2) maxScore := priorityList[0].Score for i, hp := range priorityList { if hp.Score > maxScore { maxScore = hp.Score maxScoreIndexes = maxScoreIndexes[:0] maxScoreIndexes = append(maxScoreIndexes, i) } elseif hp.Score == maxScore { maxScoreIndexes = append(maxScoreIndexes, i) } } return maxScoreIndexes }
suggestedHost, err := sched.schedule(pod) if err != nil { // schedule() may have failed because the pod would not fit on any host, so we try to // preempt, with the expectation that the next time the pod is tried for scheduling it // will fit due to the preemption. It is also possible that a different pod will schedule // into the resources that were preempted, but this is harmless. if fitError, ok := err.(*core.FitError); ok { preemptionStartTime := time.Now() sched.preempt(pod, fitError) metrics.PreemptionAttempts.Inc() metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime)) metrics.SchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime)) } return }
// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet. // This allows us to keep scheduling without waiting on binding to occur. assumedPod := pod.DeepCopy()
// Assume volumes first before assuming the pod. // // If all volumes are completely bound, then allBound is true and binding will be skipped. // // Otherwise, binding of volumes is started after the pod is assumed, but before pod binding. // // This function modifies 'assumedPod' if volume binding is required. allBound, err := sched.assumeVolumes(assumedPod, suggestedHost) if err != nil { return }
// assume modifies `assumedPod` by setting NodeName=suggestedHost err = sched.assume(assumedPod, suggestedHost) if err != nil { return }
// assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous. // assume modifies `assumed`. func(sched *Scheduler)assume(assumed *v1.Pod, host string)error { // Optimistically assume that the binding will succeed and send it to apiserver // in the background. // If the binding fails, scheduler will release resources allocated to assumed pod // immediately. assumed.Spec.NodeName = host // NOTE: Because the scheduler uses snapshots of SchedulerCache and the live // version of Ecache, updates must be written to SchedulerCache before // invalidating Ecache. if err := sched.config.SchedulerCache.AssumePod(assumed); err != nil { glog.Errorf("scheduler cache AssumePod failed: %v", err)
// This is most probably result of a BUG in retrying logic. // We report an error here so that pod scheduling can be retried. // This relies on the fact that Error will check if the pod has been bound // to a node and if so will not add it back to the unscheduled pods queue // (otherwise this would cause an infinite loop). sched.config.Error(assumed, err) sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "AssumePod failed: %v", err) sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{ Type: v1.PodScheduled, Status: v1.ConditionFalse, Reason: "SchedulerError", Message: err.Error(), }) return err }
// Optimistically assume that the binding will succeed, so we need to invalidate affected // predicates in equivalence cache. // If the binding fails, these invalidated item will not break anything. if sched.config.Ecache != nil { sched.config.Ecache.InvalidateCachedPredicateItemForPodAdd(assumed, host) } returnnil }
// bind the pod to its host asynchronously (we can do this b/c of the assumption step above). gofunc() { // Bind volumes first before Pod if !allBound { err = sched.bindVolumes(assumedPod) if err != nil { return } } err := sched.bind(assumedPod, &v1.Binding{ ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID}, Target: v1.ObjectReference{ Kind: "Node", Name: suggestedHost, }, }) metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start)) if err != nil { glog.Errorf("Internal error binding pod: (%v)", err) } }()
// bind binds a pod to a given node defined in a binding object. We expect this to run asynchronously, so we // handle binding metrics internally. func(sched *Scheduler)bind(assumed *v1.Pod, b *v1.Binding)error { bindingStart := time.Now() // If binding succeeded then PodScheduled condition will be updated in apiserver so that // it's atomic with setting host. err := sched.config.GetBinder(assumed).Bind(b) if err := sched.config.SchedulerCache.FinishBinding(assumed); err != nil { glog.Errorf("scheduler cache FinishBinding failed: %v", err) } if err != nil { glog.V(1).Infof("Failed to bind pod: %v/%v", assumed.Namespace, assumed.Name) if err := sched.config.SchedulerCache.ForgetPod(assumed); err != nil { glog.Errorf("scheduler cache ForgetPod failed: %v", err) } sched.config.Error(assumed, err) sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "Binding rejected: %v", err) sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{ Type: v1.PodScheduled, Status: v1.ConditionFalse, Reason: "BindingRejected", }) return err }