kube-scheduler源码分析(四)之 findNodesThatFit

以下代码分析基于 kubernetes v1.12.0 版本。

本文主要分析调度逻辑中的预选策略,即第一步筛选出符合pod调度条件的节点。

1. 调用入口

预选,通过预选函数来判断每个节点是否适合被该Pod调度。

genericScheduler.Schedule中对findNodesThatFit的调用过程如下:

此部分代码位于pkg/scheduler/core/generic_scheduler.go

func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) {
    ...
  // 列出所有的节点
    nodes, err := nodeLister.List()
    if err != nil {
        return "", err
    }
    if len(nodes) == 0 {
        return "", ErrNoNodesAvailable
    }

    // Used for all fit and priority funcs.
    err = g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
    if err != nil {
        return "", err
    }

    trace.Step("Computing predicates")
    startPredicateEvalTime := time.Now()
  // 调用findNodesThatFit过滤出预选节点
    filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
    if err != nil {
        return "", err
    }

    if len(filteredNodes) == 0 {
        return "", &FitError{
            Pod:              pod,
            NumAllNodes:      len(nodes),
            FailedPredicates: failedPredicateMap,
        }
    }
// metrics
  metrics.SchedulingAlgorithmPredicateEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPredicateEvalTime))
              metrics.SchedulingLatency.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPredicateEvalTime))
    ...
}

核心代码:

// 调用findNodesThatFit过滤出预选节点
filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)

2. findNodesThatFit

findNodesThatFit基于给定的预选函数过滤node,每个node传入到预选函数中来确实该节点是否符合要求。

findNodesThatFit的入参是被调度的pod和当前的节点列表,返回预选节点列表和错误。

findNodesThatFit基本流程如下:

  1. 设置可行节点的总数,作为预选节点数组的容量,避免总节点过多需要筛选的节点过多。
  2. 通过NodeTree不断获取下一个节点来判断该节点是否满足pod的调度条件。
  3. 通过之前注册的各种预选函数来判断当前节点是否符合pod的调度条件。
  4. 最后返回满足调度条件的node列表,供下一步的优选操作。

findNodesThatFit完整代码如下:

此部分代码位于pkg/scheduler/core/generic_scheduler.go

// Filters the nodes to find the ones that fit based on the given predicate functions
// Each node is passed through the predicate functions to determine if it is a fit
func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
   var filtered []*v1.Node
   failedPredicateMap := FailedPredicateMap{}

   if len(g.predicates) == 0 {
      filtered = nodes
   } else {
      allNodes := int32(g.cache.NodeTree().NumNodes)
      numNodesToFind := g.numFeasibleNodesToFind(allNodes)

      // Create filtered list with enough space to avoid growing it
      // and allow assigning.
      filtered = make([]*v1.Node, numNodesToFind)
      errs := errors.MessageCountMap{}
      var (
         predicateResultLock sync.Mutex
         filteredLen         int32
         equivClass          *equivalence.Class
      )

      ctx, cancel := context.WithCancel(context.Background())

      // We can use the same metadata producer for all nodes.
      meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap)

      if g.equivalenceCache != nil {
         // getEquivalenceClassInfo will return immediately if no equivalence pod found
         equivClass = equivalence.NewClass(pod)
      }

      checkNode := func(i int) {
         var nodeCache *equivalence.NodeCache
         nodeName := g.cache.NodeTree().Next()
         if g.equivalenceCache != nil {
            nodeCache, _ = g.equivalenceCache.GetNodeCache(nodeName)
         }
         fits, failedPredicates, err := podFitsOnNode(
            pod,
            meta,
            g.cachedNodeInfoMap[nodeName],
            g.predicates,
            g.cache,
            nodeCache,
            g.schedulingQueue,
            g.alwaysCheckAllPredicates,
            equivClass,
         )
         if err != nil {
            predicateResultLock.Lock()
            errs[err.Error()]++
            predicateResultLock.Unlock()
            return
         }
         if fits {
            length := atomic.AddInt32(&filteredLen, 1)
            if length > numNodesToFind {
               cancel()
               atomic.AddInt32(&filteredLen, -1)
            } else {
               filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
            }
         } else {
            predicateResultLock.Lock()
            failedPredicateMap[nodeName] = failedPredicates
            predicateResultLock.Unlock()
         }
      }

      // Stops searching for more nodes once the configured number of feasible nodes
      // are found.
      workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)

      filtered = filtered[:filteredLen]
      if len(errs) > 0 {
         return []*v1.Node{}, FailedPredicateMap{}, errors.CreateAggregateFromMessageCountMap(errs)
      }
   }

   if len(filtered) > 0 && len(g.extenders) != 0 {
      for _, extender := range g.extenders {
         if !extender.IsInterested(pod) {
            continue
         }
         filteredList, failedMap, err := extender.Filter(pod, filtered, g.cachedNodeInfoMap)
         if err != nil {
            if extender.IsIgnorable() {
               glog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
                  extender, err)
               continue
            } else {
               return []*v1.Node{}, FailedPredicateMap{}, err
            }
         }

         for failedNodeName, failedMsg := range failedMap {
            if _, found := failedPredicateMap[failedNodeName]; !found {
               failedPredicateMap[failedNodeName] = []algorithm.PredicateFailureReason{}
            }
            failedPredicateMap[failedNodeName] = append(failedPredicateMap[failedNodeName], predicates.NewFailureReason(failedMsg))
         }
         filtered = filteredList
         if len(filtered) == 0 {
            break
         }
      }
   }
   return filtered, failedPredicateMap, nil
}

以下对findNodesThatFit分段分析。

3. numFeasibleNodesToFind

findNodesThatFit先基于所有的节点找出可行的节点是总数。numFeasibleNodesToFind的作用主要是避免当节点过多(超过100)影响调度的效率。

allNodes := int32(g.cache.NodeTree().NumNodes)
numNodesToFind := g.numFeasibleNodesToFind(allNodes)

// Create filtered list with enough space to avoid growing it
// and allow assigning.
filtered = make([]*v1.Node, numNodesToFind)

numFeasibleNodesToFind基本流程如下:

  • 如果所有的node节点小于minFeasibleNodesToFind(当前默认为100)则返回节点数。
  • 如果节点数超100,则取指定计分的百分比的节点数,当该百分比后的数目仍小于minFeasibleNodesToFind,则返回minFeasibleNodesToFind
  • 如果百分比后的数目大于minFeasibleNodesToFind,则返回该百分比。
// numFeasibleNodesToFind returns the number of feasible nodes that once found, the scheduler stops
// its search for more feasible nodes.
func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) int32 {
    if numAllNodes < minFeasibleNodesToFind || g.percentageOfNodesToScore <= 0 ||
        g.percentageOfNodesToScore >= 100 {
        return numAllNodes
    }
    numNodes := numAllNodes * g.percentageOfNodesToScore / 100
    if numNodes < minFeasibleNodesToFind {
        return minFeasibleNodesToFind
    }
    return numNodes
}

4. checkNode

checkNode是一个校验node是否符合要求的函数,其中实际调用到的核心函数是podFitsOnNode。再通过workqueue并发执行checkNode操作。

checkNode主要流程如下:

  1. 通过cache中的nodeTree不断获取下一个node。
  2. 将当前node和pod传入podFitsOnNode判断当前node是否符合要求。
  3. 如果当前node符合要求就将当前node加入预选节点的数组中filtered
  4. 如果当前node不满足要求,则加入到失败的数组中,并记录原因。
  5. 通过workqueue.ParallelizeUntil并发执行checkNode函数,一旦找到配置的可行节点数,就停止搜索更多节点。
checkNode := func(i int) {
    var nodeCache *equivalence.NodeCache
    nodeName := g.cache.NodeTree().Next()
    if g.equivalenceCache != nil {
        nodeCache, _ = g.equivalenceCache.GetNodeCache(nodeName)
    }
    fits, failedPredicates, err := podFitsOnNode(
        pod,
        meta,
        g.cachedNodeInfoMap[nodeName],
        g.predicates,
        g.cache,
        nodeCache,
        g.schedulingQueue,
        g.alwaysCheckAllPredicates,
        equivClass,
    )
    if err != nil {
        predicateResultLock.Lock()
        errs[err.Error()]++
        predicateResultLock.Unlock()
        return
    }
    if fits {
        length := atomic.AddInt32(&filteredLen, 1)
        if length > numNodesToFind {
            cancel()
            atomic.AddInt32(&filteredLen, -1)
        } else {
            filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
        }
    } else {
        predicateResultLock.Lock()
        failedPredicateMap[nodeName] = failedPredicates
        predicateResultLock.Unlock()
    }
}

workqueue的并发操作:

// Stops searching for more nodes once the configured number of feasible nodes
// are found.
workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)

ParallelizeUntil具体代码如下:

// ParallelizeUntil is a framework that allows for parallelizing N
// independent pieces of work until done or the context is canceled.
func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc) {
    var stop <-chan struct{}
    if ctx != nil {
        stop = ctx.Done()
    }

    toProcess := make(chan int, pieces)
    for i := 0; i < pieces; i++ {
        toProcess <- i
    }
    close(toProcess)

    if pieces < workers {
        workers = pieces
    }

    wg := sync.WaitGroup{}
    wg.Add(workers)
    for i := 0; i < workers; i++ {
        go func() {
            defer utilruntime.HandleCrash()
            defer wg.Done()
            for piece := range toProcess {
                select {
                case <-stop:
                    return
                default:
                    doWorkPiece(piece)
                }
            }
        }()
    }
    wg.Wait()
}

5. podFitsOnNode

podFitsOnNode主要内容如下:

  • podFitsOnNode会检查给定的某个Node是否满足预选的函数。

  • 对于给定的pod,podFitsOnNode会检查是否有相同的pod存在,尽量复用缓存过的预选结果。

podFitsOnNode主要在Schedule(调度)和Preempt(抢占)的时候被调用。

当在Schedule中被调用的时候,主要判断是否可以被调度到当前节点,依据为当前节点上所有已存在的pod及被提名要运行到该节点的具有相等或更高优先级的pod。

当在Preempt中被调用的时候,即发生抢占的时候,通过SelectVictimsOnNode函数选出需要被移除的pod,移除后然后将预调度的pod调度到该节点上。

podFitsOnNode基本流程如下:

  1. 遍历之前注册好的预选策略predicates.Ordering,并获取预选策略的执行函数。
  2. 遍历执行每个预选函数,并返回是否合适,预选失败的原因和错误。
  3. 如果预选函数执行的结果不合适,则加入预选失败的数组中。
  4. 最后返回预选失败的个数是否为0,和预选失败的原因。

入参:

  • pod
  • PredicateMetadata
  • NodeInfo
  • predicateFuncs
  • schedulercache.Cache
  • nodeCache
  • SchedulingQueue
  • alwaysCheckAllPredicates
  • equivClass

出参:

  • fit
  • PredicateFailureReason

完整代码如下:

此部分代码位于pkg/scheduler/core/generic_scheduler.go

// podFitsOnNode checks whether a node given by NodeInfo satisfies the given predicate functions.
// For given pod, podFitsOnNode will check if any equivalent pod exists and try to reuse its cached
// predicate results as possible.
// This function is called from two different places: Schedule and Preempt.
// When it is called from Schedule, we want to test whether the pod is schedulable
// on the node with all the existing pods on the node plus higher and equal priority
// pods nominated to run on the node.
// When it is called from Preempt, we should remove the victims of preemption and
// add the nominated pods. Removal of the victims is done by SelectVictimsOnNode().
// It removes victims from meta and NodeInfo before calling this function.
func podFitsOnNode(
    pod *v1.Pod,
    meta algorithm.PredicateMetadata,
    info *schedulercache.NodeInfo,
    predicateFuncs map[string]algorithm.FitPredicate,
    cache schedulercache.Cache,
    nodeCache *equivalence.NodeCache,
    queue SchedulingQueue,
    alwaysCheckAllPredicates bool,
    equivClass *equivalence.Class,
) (bool, []algorithm.PredicateFailureReason, error) {
    var (
        eCacheAvailable  bool
        failedPredicates []algorithm.PredicateFailureReason
    )

    podsAdded := false
    // We run predicates twice in some cases. If the node has greater or equal priority
    // nominated pods, we run them when those pods are added to meta and nodeInfo.
    // If all predicates succeed in this pass, we run them again when these
    // nominated pods are not added. This second pass is necessary because some
    // predicates such as inter-pod affinity may not pass without the nominated pods.
    // If there are no nominated pods for the node or if the first run of the
    // predicates fail, we don't run the second pass.
    // We consider only equal or higher priority pods in the first pass, because
    // those are the current "pod" must yield to them and not take a space opened
    // for running them. It is ok if the current "pod" take resources freed for
    // lower priority pods.
    // Requiring that the new pod is schedulable in both circumstances ensures that
    // we are making a conservative decision: predicates like resources and inter-pod
    // anti-affinity are more likely to fail when the nominated pods are treated
    // as running, while predicates like pod affinity are more likely to fail when
    // the nominated pods are treated as not running. We can't just assume the
    // nominated pods are running because they are not running right now and in fact,
    // they may end up getting scheduled to a different node.
    for i := 0; i < 2; i++ {
        metaToUse := meta
        nodeInfoToUse := info
        if i == 0 {
            podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(util.GetPodPriority(pod), meta, info, queue)
        } else if !podsAdded || len(failedPredicates) != 0 {
            break
        }
        // Bypass eCache if node has any nominated pods.
        // TODO(bsalamat): consider using eCache and adding proper eCache invalidations
        // when pods are nominated or their nominations change.
        eCacheAvailable = equivClass != nil && nodeCache != nil && !podsAdded
        for _, predicateKey := range predicates.Ordering() {
            var (
                fit     bool
                reasons []algorithm.PredicateFailureReason
                err     error
            )
            //TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
            if predicate, exist := predicateFuncs[predicateKey]; exist {
                if eCacheAvailable {
                    fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, pod, metaToUse, nodeInfoToUse, equivClass, cache)
                } else {
                    fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
                }
                if err != nil {
                    return false, []algorithm.PredicateFailureReason{}, err
                }

                if !fit {
                    // eCache is available and valid, and predicates result is unfit, record the fail reasons
                    failedPredicates = append(failedPredicates, reasons...)
                    // if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails.
                    if !alwaysCheckAllPredicates {
                        glog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate " +
                            "evaluation is short circuited and there are chances " +
                            "of other predicates failing as well.")
                        break
                    }
                }
            }
        }
    }

    return len(failedPredicates) == 0, failedPredicates, nil
}

5.1. predicateFuncs

根据之前初注册好的预选策略函数来执行预选,判断节点是否符合调度。

for _, predicateKey := range predicates.Ordering() {
    if predicate, exist := predicateFuncs[predicateKey]; exist {
        if eCacheAvailable {
            fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, pod, metaToUse, nodeInfoToUse, equivClass, cache)
        } else {
            fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
        }

预选策略如下:

var (
    predicatesOrdering = []string{CheckNodeConditionPred, CheckNodeUnschedulablePred,
        GeneralPred, HostNamePred, PodFitsHostPortsPred,
        MatchNodeSelectorPred, PodFitsResourcesPred, NoDiskConflictPred,
        PodToleratesNodeTaintsPred, PodToleratesNodeNoExecuteTaintsPred, CheckNodeLabelPresencePred,
        CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred, MaxCSIVolumeCountPred,
        MaxAzureDiskVolumeCountPred, CheckVolumeBindingPred, NoVolumeZoneConflictPred,
        CheckNodeMemoryPressurePred, CheckNodePIDPressurePred, CheckNodeDiskPressurePred, MatchInterPodAffinityPred}
)

6. PodFitsResources

以下以PodFitsResources这个预选函数为例做分析,其他重要的预选函数待后续单独分析。

PodFitsResources用来检查一个节点是否有足够的资源来运行当前的pod,包括CPU、内存、GPU等。

PodFitsResources基本流程如下:

  1. 判断当前节点上pod总数加上预调度pod个数是否大于node的可分配pod总数,若是则不允许调度。
  2. 判断pod的request值是否都为0,若是则允许调度。
  3. 判断pod的request值加上当前node上所有pod的request值总和是否大于node的可分配资源,若是则不允许调度。
  4. 判断pod的拓展资源request值加上当前node上所有pod对应的request值总和是否大于node对应的可分配资源,若是则不允许调度。

PodFitsResources的注册代码如下:

factory.RegisterFitPredicate(predicates.PodFitsResourcesPred, predicates.PodFitsResources)

PodFitsResources入参:

  • pod

  • nodeInfo

  • PredicateMetadata

PodFitsResources出参:

  • fit
  • PredicateFailureReason

PodFitsResources完整代码:

此部分的代码位于pkg/scheduler/algorithm/predicates/predicates.go

// PodFitsResources checks if a node has sufficient resources, such as cpu, memory, gpu, opaque int resources etc to run a pod.
// First return value indicates whether a node has sufficient resources to run a pod while the second return value indicates the
// predicate failure reasons if the node has insufficient resources to run the pod.
func PodFitsResources(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
    node := nodeInfo.Node()
    if node == nil {
        return false, nil, fmt.Errorf("node not found")
    }

    var predicateFails []algorithm.PredicateFailureReason
    allowedPodNumber := nodeInfo.AllowedPodNumber()
    if len(nodeInfo.Pods())+1 > allowedPodNumber {
        predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourcePods, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber)))
    }

    // No extended resources should be ignored by default.
    ignoredExtendedResources := sets.NewString()

    var podRequest *schedulercache.Resource
    if predicateMeta, ok := meta.(*predicateMetadata); ok {
        podRequest = predicateMeta.podRequest
        if predicateMeta.ignoredExtendedResources != nil {
            ignoredExtendedResources = predicateMeta.ignoredExtendedResources
        }
    } else {
        // We couldn't parse metadata - fallback to computing it.
        podRequest = GetResourceRequest(pod)
    }
    if podRequest.MilliCPU == 0 &&
        podRequest.Memory == 0 &&
        podRequest.EphemeralStorage == 0 &&
        len(podRequest.ScalarResources) == 0 {
        return len(predicateFails) == 0, predicateFails, nil
    }

    allocatable := nodeInfo.AllocatableResource()
    if allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.RequestedResource().MilliCPU {
        predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceCPU, podRequest.MilliCPU, nodeInfo.RequestedResource().MilliCPU, allocatable.MilliCPU))
    }
    if allocatable.Memory < podRequest.Memory+nodeInfo.RequestedResource().Memory {
        predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceMemory, podRequest.Memory, nodeInfo.RequestedResource().Memory, allocatable.Memory))
    }
    if allocatable.EphemeralStorage < podRequest.EphemeralStorage+nodeInfo.RequestedResource().EphemeralStorage {
        predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceEphemeralStorage, podRequest.EphemeralStorage, nodeInfo.RequestedResource().EphemeralStorage, allocatable.EphemeralStorage))
    }

    for rName, rQuant := range podRequest.ScalarResources {
        if v1helper.IsExtendedResourceName(rName) {
            // If this resource is one of the extended resources that should be
            // ignored, we will skip checking it.
            if ignoredExtendedResources.Has(string(rName)) {
                continue
            }
        }
        if allocatable.ScalarResources[rName] < rQuant+nodeInfo.RequestedResource().ScalarResources[rName] {
            predicateFails = append(predicateFails, NewInsufficientResourceError(rName, podRequest.ScalarResources[rName], nodeInfo.RequestedResource().ScalarResources[rName], allocatable.ScalarResources[rName]))
        }
    }

    if glog.V(10) {
        if len(predicateFails) == 0 {
            // We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
            // not logged. There is visible performance gain from it.
            glog.Infof("Schedule Pod %+v on Node %+v is allowed, Node is running only %v out of %v Pods.",
                podName(pod), node.Name, len(nodeInfo.Pods()), allowedPodNumber)
        }
    }
    return len(predicateFails) == 0, predicateFails, nil
}

6.1. NodeInfo

NodeInfo是node的聚合信息,主要包括:

  • node:k8s node的结构体
  • pods:当前node上pod的数量
  • requestedResource:当前node上所有pod的request总和
  • allocatableResource:node的实际所有的可分配资源(对应于Node.Status.Allocatable.*),可理解为node的资源总量。

此部分代码位于pkg/scheduler/cache/node_info.go

// NodeInfo is node level aggregated information.
type NodeInfo struct {
    // Overall node information.
    node *v1.Node

    pods             []*v1.Pod
    podsWithAffinity []*v1.Pod
    usedPorts        util.HostPortInfo

    // Total requested resource of all pods on this node.
    // It includes assumed pods which scheduler sends binding to apiserver but
    // didn't get it as scheduled yet.
    requestedResource *Resource
    nonzeroRequest    *Resource
    // We store allocatedResources (which is Node.Status.Allocatable.*) explicitly
    // as int64, to avoid conversions and accessing map.
    allocatableResource *Resource

    // Cached taints of the node for faster lookup.
    taints    []v1.Taint
    taintsErr error

    // imageStates holds the entry of an image if and only if this image is on the node. The entry can be used for
    // checking an image's existence and advanced usage (e.g., image locality scheduling policy) based on the image
    // state information.
    imageStates map[string]*ImageStateSummary

    // TransientInfo holds the information pertaining to a scheduling cycle. This will be destructed at the end of
    // scheduling cycle.
    // TODO: @ravig. Remove this once we have a clear approach for message passing across predicates and priorities.
    TransientInfo *transientSchedulerInfo

    // Cached conditions of node for faster lookup.
    memoryPressureCondition v1.ConditionStatus
    diskPressureCondition   v1.ConditionStatus
    pidPressureCondition    v1.ConditionStatus

    // Whenever NodeInfo changes, generation is bumped.
    // This is used to avoid cloning it if the object didn't change.
    generation int64
}

6.2. Resource

Resource是可计算资源的集合体。主要包括:

  • MilliCPU
  • Memory
  • EphemeralStorage
  • AllowedPodNumber:允许的pod总数(对应于Node.Status.Allocatable.Pods().Value()),一般为110。
  • ScalarResources
// Resource is a collection of compute resource.
type Resource struct {
    MilliCPU         int64
    Memory           int64
    EphemeralStorage int64
    // We store allowedPodNumber (which is Node.Status.Allocatable.Pods().Value())
    // explicitly as int, to avoid conversions and improve performance.
    AllowedPodNumber int
    // ScalarResources
    ScalarResources map[v1.ResourceName]int64
}

以下分析podFitsOnNode的具体流程。

6.3. allowedPodNumber

首先获取节点的信息,先判断如果该节点当前所有的pod的个数加上当前预调度的pod是否会大于该节点允许的pod的总数,一般为110个。如果超过,则predicateFails数组增加1,即当前节点不适合该pod。

node := nodeInfo.Node()
if node == nil {
    return false, nil, fmt.Errorf("node not found")
}

var predicateFails []algorithm.PredicateFailureReason
allowedPodNumber := nodeInfo.AllowedPodNumber()
if len(nodeInfo.Pods())+1 > allowedPodNumber {
    predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourcePods, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber)))
    }

6.4. podRequest

如果podRequest都为0,则允许调度到该节点,直接返回结果。

if podRequest.MilliCPU == 0 &&
    podRequest.Memory == 0 &&
    podRequest.EphemeralStorage == 0 &&
    len(podRequest.ScalarResources) == 0 {
    return len(predicateFails) == 0, predicateFails, nil
}

6.5. AllocatableResource

如果当前预调度的pod的request资源加上当前node上所有pod的request总和大于该node的可分配资源总量,则不允许调度到该节点,直接返回结果。其中request资源包括CPU、内存、storage。

allocatable := nodeInfo.AllocatableResource()
if allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.RequestedResource().MilliCPU {
    predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceCPU, podRequest.MilliCPU, nodeInfo.RequestedResource().MilliCPU, allocatable.MilliCPU))
}
if allocatable.Memory < podRequest.Memory+nodeInfo.RequestedResource().Memory {
    predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceMemory, podRequest.Memory, nodeInfo.RequestedResource().Memory, allocatable.Memory))
}
if allocatable.EphemeralStorage < podRequest.EphemeralStorage+nodeInfo.RequestedResource().EphemeralStorage {
    predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceEphemeralStorage, podRequest.EphemeralStorage, nodeInfo.RequestedResource().EphemeralStorage, allocatable.EphemeralStorage))
    }

6.6. ScalarResources

判断其他拓展的标量资源,是否该pod的request值加上当前node上所有pod的对应资源的request总和大于该node上对应资源的可分配总量,如果是,则不允许调度到该节点。

for rName, rQuant := range podRequest.ScalarResources {
    if v1helper.IsExtendedResourceName(rName) {
        // If this resource is one of the extended resources that should be
        // ignored, we will skip checking it.
        if ignoredExtendedResources.Has(string(rName)) {
            continue
        }
    }
    if allocatable.ScalarResources[rName] < rQuant+nodeInfo.RequestedResource().ScalarResources[rName] {
        predicateFails = append(predicateFails, NewInsufficientResourceError(rName, podRequest.ScalarResources[rName], nodeInfo.RequestedResource().ScalarResources[rName], allocatable.ScalarResources[rName]))
    }
}

7. 总结

findNodesThatFit基于给定的预选函数过滤node,每个node传入到预选函数中来确实该节点是否符合要求。

findNodesThatFit的入参是被调度的pod和当前的节点列表,返回预选节点列表和错误。

findNodesThatFit基本流程如下:

  1. 设置可行节点的总数,作为预选节点数组的容量,避免总节点过多导致需要筛选的节点过多,效率低。
  2. 通过NodeTree不断获取下一个节点来判断该节点是否满足pod的调度条件。
  3. 通过之前注册的各种预选函数来判断当前节点是否符合pod的调度条件。
  4. 最后返回满足调度条件的node列表,供下一步的优选操作。

7.1. checkNode

checkNode是一个校验node是否符合要求的函数,其中实际调用到的核心函数是podFitsOnNode。再通过workqueue并发执行checkNode操作。

checkNode主要流程如下:

  1. 通过cache中的nodeTree不断获取下一个node。
  2. 将当前node和pod传入podFitsOnNode判断当前node是否符合要求。
  3. 如果当前node符合要求就将当前node加入预选节点的数组中filtered
  4. 如果当前node不满足要求,则加入到失败的数组中,并记录原因。
  5. 通过workqueue.ParallelizeUntil并发执行checkNode函数,一旦找到配置的可行节点数,就停止搜索更多节点。

7.2. podFitsOnNode

其中会调用到核心函数podFitsOnNode。

podFitsOnNode主要内容如下:

  • podFitsOnNode会检查给定的某个Node是否满足预选的函数。

  • 对于给定的pod,podFitsOnNode会检查是否有相同的pod存在,尽量复用缓存过的预选结果。

podFitsOnNode主要在Schedule(调度)和Preempt(抢占)的时候被调用。

当在Schedule中被调用的时候,主要判断是否可以被调度到当前节点,依据为当前节点上所有已存在的pod及被提名要运行到该节点的具有相等或更高优先级的pod。

当在Preempt中被调用的时候,即发生抢占的时候,通过SelectVictimsOnNode函数选出需要被移除的pod,移除后然后将预调度的pod调度到该节点上。

podFitsOnNode基本流程如下:

  1. 遍历之前注册好的预选策略predicates.Ordering,并获取预选策略的执行函数。
  2. 遍历执行每个预选函数,并返回是否合适,预选失败的原因和错误。
  3. 如果预选函数执行的结果不合适,则加入预选失败的数组中。
  4. 最后返回预选失败的个数是否为0,和预选失败的原因。

7.3. PodFitsResources

本文只示例分析了其中一个重要的预选函数:PodFitsResources

PodFitsResources用来检查一个节点是否有足够的资源来运行当前的pod,包括CPU、内存、GPU等。

PodFitsResources基本流程如下:

  1. 判断当前节点上pod总数加上预调度pod个数是否大于node的可分配pod总数,若是则不允许调度。
  2. 判断pod的request值是否都为0,若是则允许调度。
  3. 判断pod的request值加上当前node上所有pod的request值总和是否大于node的可分配资源,若是则不允许调度。
  4. 判断pod的拓展资源request值加上当前node上所有pod对应的request值总和是否大于node对应的可分配资源,若是则不允许调度。

参考:

Copyright © www.huweihuang.com 2017-2018 all right reserved,powered by GitbookUpdated at 2019-07-31 10:18:04

results matching ""

    No results matching ""