// Filters the nodes to find the ones that fit based on the given predicate functions // Each node is passed through the predicate functions to determine if it is a fit func(g *genericScheduler)findNodesThatFit(pod *v1.Pod, nodes []*v1.Node)([]*v1.Node, FailedPredicateMap, error) { var filtered []*v1.Node failedPredicateMap := FailedPredicateMap{}
// Create filtered list with enough space to avoid growing it // and allow assigning. filtered = make([]*v1.Node, numNodesToFind) errs := errors.MessageCountMap{} var ( predicateResultLock sync.Mutex filteredLen int32 equivClass *equivalence.Class )
// Stops searching for more nodes once the configured number of feasible nodes // are found. workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
// Stops searching for more nodes once the configured number of feasible nodes // are found. workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
// ParallelizeUntil is a framework that allows for parallelizing N // independent pieces of work until done or the context is canceled. funcParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc) { var stop <-chanstruct{} if ctx != nil { stop = ctx.Done() }
toProcess := make(chanint, pieces) for i := 0; i < pieces; i++ { toProcess <- i } close(toProcess)
if pieces < workers { workers = pieces }
wg := sync.WaitGroup{} wg.Add(workers) for i := 0; i < workers; i++ { gofunc() { defer utilruntime.HandleCrash() defer wg.Done() for piece := range toProcess { select { case <-stop: return default: doWorkPiece(piece) } } }() } wg.Wait() }
// podFitsOnNode checks whether a node given by NodeInfo satisfies the given predicate functions. // For given pod, podFitsOnNode will check if any equivalent pod exists and try to reuse its cached // predicate results as possible. // This function is called from two different places: Schedule and Preempt. // When it is called from Schedule, we want to test whether the pod is schedulable // on the node with all the existing pods on the node plus higher and equal priority // pods nominated to run on the node. // When it is called from Preempt, we should remove the victims of preemption and // add the nominated pods. Removal of the victims is done by SelectVictimsOnNode(). // It removes victims from meta and NodeInfo before calling this function. funcpodFitsOnNode( pod *v1.Pod, meta algorithm.PredicateMetadata, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate, cache schedulercache.Cache, nodeCache *equivalence.NodeCache, queue SchedulingQueue, alwaysCheckAllPredicates bool, equivClass *equivalence.Class, )(bool, []algorithm.PredicateFailureReason, error) { var ( eCacheAvailable bool failedPredicates []algorithm.PredicateFailureReason )
podsAdded := false // We run predicates twice in some cases. If the node has greater or equal priority // nominated pods, we run them when those pods are added to meta and nodeInfo. // If all predicates succeed in this pass, we run them again when these // nominated pods are not added. This second pass is necessary because some // predicates such as inter-pod affinity may not pass without the nominated pods. // If there are no nominated pods for the node or if the first run of the // predicates fail, we don't run the second pass. // We consider only equal or higher priority pods in the first pass, because // those are the current "pod" must yield to them and not take a space opened // for running them. It is ok if the current "pod" take resources freed for // lower priority pods. // Requiring that the new pod is schedulable in both circumstances ensures that // we are making a conservative decision: predicates like resources and inter-pod // anti-affinity are more likely to fail when the nominated pods are treated // as running, while predicates like pod affinity are more likely to fail when // the nominated pods are treated as not running. We can't just assume the // nominated pods are running because they are not running right now and in fact, // they may end up getting scheduled to a different node. for i := 0; i < 2; i++ { metaToUse := meta nodeInfoToUse := info if i == 0 { podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(util.GetPodPriority(pod), meta, info, queue) } elseif !podsAdded || len(failedPredicates) != 0 { break } // Bypass eCache if node has any nominated pods. // TODO(bsalamat): consider using eCache and adding proper eCache invalidations // when pods are nominated or their nominations change. eCacheAvailable = equivClass != nil && nodeCache != nil && !podsAdded for _, predicateKey := range predicates.Ordering() { var ( fit bool reasons []algorithm.PredicateFailureReason err error ) //TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric if predicate, exist := predicateFuncs[predicateKey]; exist { if eCacheAvailable { fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, pod, metaToUse, nodeInfoToUse, equivClass, cache) } else { fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse) } if err != nil { returnfalse, []algorithm.PredicateFailureReason{}, err }
if !fit { // eCache is available and valid, and predicates result is unfit, record the fail reasons failedPredicates = append(failedPredicates, reasons...) // if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails. if !alwaysCheckAllPredicates { glog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate " + "evaluation is short circuited and there are chances " + "of other predicates failing as well.") break } } } } }
// PodFitsResources checks if a node has sufficient resources, such as cpu, memory, gpu, opaque int resources etc to run a pod. // First return value indicates whether a node has sufficient resources to run a pod while the second return value indicates the // predicate failure reasons if the node has insufficient resources to run the pod. funcPodFitsResources(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo)(bool, []algorithm.PredicateFailureReason, error) { node := nodeInfo.Node() if node == nil { returnfalse, nil, fmt.Errorf("node not found") }
for rName, rQuant := range podRequest.ScalarResources { if v1helper.IsExtendedResourceName(rName) { // If this resource is one of the extended resources that should be // ignored, we will skip checking it. if ignoredExtendedResources.Has(string(rName)) { continue } } if allocatable.ScalarResources[rName] < rQuant+nodeInfo.RequestedResource().ScalarResources[rName] { predicateFails = append(predicateFails, NewInsufficientResourceError(rName, podRequest.ScalarResources[rName], nodeInfo.RequestedResource().ScalarResources[rName], allocatable.ScalarResources[rName])) } }
if glog.V(10) { iflen(predicateFails) == 0 { // We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is // not logged. There is visible performance gain from it. glog.Infof("Schedule Pod %+v on Node %+v is allowed, Node is running only %v out of %v Pods.", podName(pod), node.Name, len(nodeInfo.Pods()), allowedPodNumber) } } returnlen(predicateFails) == 0, predicateFails, nil }
// Total requested resource of all pods on this node. // It includes assumed pods which scheduler sends binding to apiserver but // didn't get it as scheduled yet. requestedResource *Resource nonzeroRequest *Resource // We store allocatedResources (which is Node.Status.Allocatable.*) explicitly // as int64, to avoid conversions and accessing map. allocatableResource *Resource
// Cached taints of the node for faster lookup. taints []v1.Taint taintsErr error
// imageStates holds the entry of an image if and only if this image is on the node. The entry can be used for // checking an image's existence and advanced usage (e.g., image locality scheduling policy) based on the image // state information. imageStates map[string]*ImageStateSummary
// TransientInfo holds the information pertaining to a scheduling cycle. This will be destructed at the end of // scheduling cycle. // TODO: @ravig. Remove this once we have a clear approach for message passing across predicates and priorities. TransientInfo *transientSchedulerInfo
// Cached conditions of node for faster lookup. memoryPressureCondition v1.ConditionStatus diskPressureCondition v1.ConditionStatus pidPressureCondition v1.ConditionStatus
// Whenever NodeInfo changes, generation is bumped. // This is used to avoid cloning it if the object didn't change. generation int64 }
// Resource is a collection of compute resource. type Resource struct { MilliCPU int64 Memory int64 EphemeralStorage int64 // We store allowedPodNumber (which is Node.Status.Allocatable.Pods().Value()) // explicitly as int, to avoid conversions and improve performance. AllowedPodNumber int // ScalarResources ScalarResources map[v1.ResourceName]int64 }
for rName, rQuant := range podRequest.ScalarResources { if v1helper.IsExtendedResourceName(rName) { // If this resource is one of the extended resources that should be // ignored, we will skip checking it. if ignoredExtendedResources.Has(string(rName)) { continue } } if allocatable.ScalarResources[rName] < rQuant+nodeInfo.RequestedResource().ScalarResources[rName] { predicateFails = append(predicateFails, NewInsufficientResourceError(rName, podRequest.ScalarResources[rName], nodeInfo.RequestedResource().ScalarResources[rName], allocatable.ScalarResources[rName])) } }