本文主要分析DaemonSetController的源码逻辑,daemonset是运行在指定节点上的服务,常用来作为agent类的服务来配置,也是k8s最常用的控制器之一。

1. startDaemonSetController

startDaemonSetController是入口函数,先New后Run。

func startDaemonSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
   dsc, err := daemon.NewDaemonSetsController(
      ctx,
      controllerContext.InformerFactory.Apps().V1().DaemonSets(),
      controllerContext.InformerFactory.Apps().V1().ControllerRevisions(),
      controllerContext.InformerFactory.Core().V1().Pods(),
      controllerContext.InformerFactory.Core().V1().Nodes(),
      controllerContext.ClientBuilder.ClientOrDie("daemon-set-controller"),
      flowcontrol.NewBackOff(1*time.Second, 15*time.Minute),
   )
   if err != nil {
      return nil, true, fmt.Errorf("error creating DaemonSets controller: %v", err)
   }
   go dsc.Run(ctx, int(controllerContext.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs))
   return nil, true, nil
}

2. NewDaemonSetsController

NewDaemonSetsController仍然是常见的k8s controller初始化逻辑:

  • 常用配置初始化
  • 根据所需要的对象,添加event handler,便于监听所需要对象的事件变化,此处包括ds, pod,node三个对象。
  • 赋值syncHandler,即具体实现是syncDaemonSet函数。
func NewDaemonSetsController(
    ctx context.Context,
    daemonSetInformer appsinformers.DaemonSetInformer,
    historyInformer appsinformers.ControllerRevisionInformer,
    podInformer coreinformers.PodInformer,
    nodeInformer coreinformers.NodeInformer,
    kubeClient clientset.Interface,
    failedPodsBackoff *flowcontrol.Backoff,
) (*DaemonSetsController, error) {

  // 常用配置初始化
    dsc := &DaemonSetsController{
        kubeClient:       kubeClient,
        eventBroadcaster: eventBroadcaster,
        eventRecorder:    eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
        podControl: controller.RealPodControl{
            KubeClient: kubeClient,
            Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
        },
        crControl: controller.RealControllerRevisionControl{
            KubeClient: kubeClient,
        },
        burstReplicas: BurstReplicas,
        expectations:  controller.NewControllerExpectations(),
        queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"),
    }

  // 添加event handler
    daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            dsc.addDaemonset(logger, obj)
        },
        UpdateFunc: func(oldObj, newObj interface{}) {
            dsc.updateDaemonset(logger, oldObj, newObj)
        },
        DeleteFunc: func(obj interface{}) {
            dsc.deleteDaemonset(logger, obj)
        },
    })
    dsc.dsLister = daemonSetInformer.Lister()
    dsc.dsStoreSynced = daemonSetInformer.Informer().HasSynced

    historyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            dsc.addHistory(logger, obj)
        },
        UpdateFunc: func(oldObj, newObj interface{}) {
            dsc.updateHistory(logger, oldObj, newObj)
        },
        DeleteFunc: func(obj interface{}) {
            dsc.deleteHistory(logger, obj)
        },
    })
    dsc.historyLister = historyInformer.Lister()
    dsc.historyStoreSynced = historyInformer.Informer().HasSynced

    // 添加pod的event handler
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            dsc.addPod(logger, obj)
        },
        UpdateFunc: func(oldObj, newObj interface{}) {
            dsc.updatePod(logger, oldObj, newObj)
        },
        DeleteFunc: func(obj interface{}) {
            dsc.deletePod(logger, obj)
        },
    })
    dsc.podLister = podInformer.Lister()
    dsc.podStoreSynced = podInformer.Informer().HasSynced

  // 添加node的event handler
    nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            dsc.addNode(logger, obj)
        },
        UpdateFunc: func(oldObj, newObj interface{}) {
            dsc.updateNode(logger, oldObj, newObj)
        },
    },
    )
    dsc.nodeStoreSynced = nodeInformer.Informer().HasSynced
    dsc.nodeLister = nodeInformer.Lister()

  // 赋值syncHandler,具体的controller处理代码
    dsc.syncHandler = dsc.syncDaemonSet
    dsc.enqueueDaemonSet = dsc.enqueue

    dsc.failedPodsBackoff = failedPodsBackoff

    return dsc, nil
}

3. Run

Run函数与其他controller的逻辑一致不再分析,具体可以阅读本源码分析系列的replicaset-controller分析。

func (dsc *DaemonSetsController) Run(ctx context.Context, workers int) {
    defer utilruntime.HandleCrash()

    dsc.eventBroadcaster.StartStructuredLogging(0)
    dsc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dsc.kubeClient.CoreV1().Events("")})
    defer dsc.eventBroadcaster.Shutdown()

    defer dsc.queue.ShutDown()

    logger := klog.FromContext(ctx)
    logger.Info("Starting daemon sets controller")
    defer logger.Info("Shutting down daemon sets controller")

    if !cache.WaitForNamedCacheSync("daemon sets", ctx.Done(), dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.historyStoreSynced, dsc.dsStoreSynced) {
        return
    }

    for i := 0; i < workers; i++ {
        go wait.UntilWithContext(ctx, dsc.runWorker, time.Second)
    }

    go wait.Until(dsc.failedPodsBackoff.GC, BackoffGCInterval, ctx.Done())

    <-ctx.Done()
}

3.1. processNextWorkItem

processNextWorkItem可参考replicaset-controller对该部分的分析。

func (dsc *DaemonSetsController) runWorker(ctx context.Context) {
   for dsc.processNextWorkItem(ctx) {
   }
}

// processNextWorkItem deals with one key off the queue.  It returns false when it's time to quit.
func (dsc *DaemonSetsController) processNextWorkItem(ctx context.Context) bool {
   dsKey, quit := dsc.queue.Get()
   if quit {
      return false
   }
   defer dsc.queue.Done(dsKey)

   err := dsc.syncHandler(ctx, dsKey.(string))
   if err == nil {
      dsc.queue.Forget(dsKey)
      return true
   }

   utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
   dsc.queue.AddRateLimited(dsKey)

   return true
}

4. syncDaemonSet

syncDaemonSet是控制器具体的实现逻辑,即每个控制器的核心实现大部分在syncHandler这个函数中。

func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string) error {
   // 为了突出代码重点,已删除非必要部分。
     // 获取集群中的ds对象
   namespace, name, err := cache.SplitMetaNamespaceKey(key)
   ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)

     // 获取机器列表
   nodeList, err := dsc.nodeLister.List(labels.Everything())
   everything := metav1.LabelSelector{}
   if reflect.DeepEqual(ds.Spec.Selector, &everything) {
      dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, SelectingAllReason, "This daemon set is selecting all pods. A non-empty selector is required.")
      return nil
   }

   // Construct histories of the DaemonSet, and get the hash of current history
   cur, old, err := dsc.constructHistory(ctx, ds)
   hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]

   if !dsc.expectations.SatisfiedExpectations(logger, dsKey) {
      // Only update status. Don't raise observedGeneration since controller didn't process object of that generation.
      return dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, false)
   }

   // 处理daemonset中的pod创建及删除
   err = dsc.updateDaemonSet(ctx, ds, nodeList, hash, dsKey, old)
   // 更新状态
   statusErr := dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, true)
   switch {
   case err != nil && statusErr != nil:
      logger.Error(statusErr, "Failed to update status", "daemonSet", klog.KObj(ds))
      return err
   case err != nil:
      return err
   case statusErr != nil:
      return statusErr
   }

   return nil
}

5. manage

manage是具体的daemonset的pod创建及删除的具体代码。manage入口在updateDaemonSet的函数中。

func (dsc *DaemonSetsController) updateDaemonSet(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash, key string, old []*apps.ControllerRevision) error {
  // manage处理pod的创建及删除
    err := dsc.manage(ctx, ds, nodeList, hash)

    err = dsc.cleanupHistory(ctx, ds, old)
    if err != nil {
        return fmt.Errorf("failed to clean up revisions of DaemonSet: %w", err)
    }

    return nil
}

以下是manage的具体代码

func (dsc *DaemonSetsController) manage(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
   // 查找daemonset中pod和node的映射
   nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, false)
     // 计算需要创建和删除的pod
   var nodesNeedingDaemonPods, podsToDelete []string
   for _, node := range nodeList {
      nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode := dsc.podsShouldBeOnNode(
         logger, node, nodeToDaemonPods, ds, hash)

      nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
      podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
   }

   // Remove unscheduled pods assigned to not existing nodes when daemonset pods are scheduled by scheduler.
   // If node doesn't exist then pods are never scheduled and can't be deleted by PodGCController.
   podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...)

   // 根据上述的计算结果,实现具体创建和删除pod的操作
   if err = dsc.syncNodes(ctx, ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
      return err
   }

   return nil
}

6. syncNodes

syncNodes根据传入的需要创建和删除的pod,实现具体的创建和删除pod的操作。

func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
    // 已删除次要代码
    dsKey, err := controller.KeyFunc(ds)

    batchSize := integer.IntMin(createDiff, controller.SlowStartInitialBatchSize)
    for pos := 0; createDiff > pos; batchSize, pos = integer.IntMin(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize {
        errorCount := len(errCh)
        createWait.Add(batchSize)
        for i := pos; i < pos+batchSize; i++ {
            go func(ix int) {
                defer createWait.Done()
                // 批量创建pod
                err := dsc.podControl.CreatePods(ctx, ds.Namespace, podTemplate,
                    ds, metav1.NewControllerRef(ds, controllerKind))
            }(i)
        }
        createWait.Wait()
    }

    deleteWait := sync.WaitGroup{}
    deleteWait.Add(deleteDiff)
    for i := 0; i < deleteDiff; i++ {
        go func(ix int) {
            defer deleteWait.Done()
      // 批量删除pod
            if err := dsc.podControl.DeletePod(ctx, ds.Namespace, podsToDelete[ix], ds); err != nil {
                dsc.expectations.DeletionObserved(logger, dsKey)
            }
        }(i)
    }
    deleteWait.Wait()

    // 处理错误
    errors := []error{}
    close(errCh)
    for err := range errCh {
        errors = append(errors, err)
    }
    return utilerrors.NewAggregate(errors)
}

参考:

Copyright © www.huweihuang.com 2017-2018 all right reserved,powered by GitbookUpdated at 2024-07-05 19:06:21

results matching ""

    No results matching ""