// start the kubelet server if enableServer { go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)
} if kubeCfg.ReadOnlyPort > 0 { go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort)) } }
// Run starts the kubelet reacting to config updates func(kl *Kubelet)Run(updates <-chan kubetypes.PodUpdate) { if kl.logServer == nil { kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/"))) } if kl.kubeClient == nil { glog.Warning("No api server defined - no node status update will be sent.") }
// Start the cloud provider sync manager if kl.cloudResourceSyncManager != nil { go kl.cloudResourceSyncManager.Run(wait.NeverStop) }
// Start volume manager go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
if kl.kubeClient != nil { // Start syncing node status immediately, this may set up things the runtime needs to run. go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop) go kl.fastStatusUpdateOnce()
// start syncing lease if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) { go kl.nodeLeaseController.Run(wait.NeverStop) } } go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
// Start loop to sync iptables util rules if kl.makeIPTablesUtilChains { go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop) }
// Start a goroutine responsible for killing pods (that are not properly // handled by pod workers). go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)
// initializeModules will initialize internal modules that do not require the container runtime to be up. // Note that the modules here must not depend on modules that are not initialized here. func(kl *Kubelet)initializeModules()error { // Prometheus metrics. metrics.Register(kl.runtimeCache, collectors.NewVolumeStatsCollector(kl))
// If the container logs directory does not exist, create it. if _, err := os.Stat(ContainerLogsDir); err != nil { if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil { glog.Errorf("Failed to create directory %q: %v", ContainerLogsDir, err) } }
// Start the image manager. kl.imageManager.Start()
// Start the certificate manager if it was enabled. if kl.serverCertificateManager != nil { kl.serverCertificateManager.Start() }
// Start out of memory watcher. if err := kl.oomWatcher.Start(kl.nodeRef); err != nil { return fmt.Errorf("Failed to start OOM watcher %v", err) }
// If the container logs directory does not exist, create it. if _, err := os.Stat(ContainerLogsDir); err != nil { if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil { glog.Errorf("Failed to create directory %q: %v", ContainerLogsDir, err) } }
if kl.kubeClient != nil { // Start syncing node status immediately, this may set up things the runtime needs to run. go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop) go kl.fastStatusUpdateOnce()
// start syncing lease if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) { go kl.nodeLeaseController.Run(wait.NeverStop) } }
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
4.4. syncNetworkUtil
通过循环的方式同步iptables的规则,不过当前代码并没有执行任何操作。
1 2 3 4
// Start loop to sync iptables util rules if kl.makeIPTablesUtilChains { go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop) }
4.5. podKiller
但pod没有被podworker正确处理的时候,启动一个goroutine负责杀死pod。
1 2 3
// Start a goroutine responsible for killing pods (that are not properly // handled by pod workers). go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)
// podKiller launches a goroutine to kill a pod received from the channel if // another goroutine isn't already in action. func(kl *Kubelet)podKiller() { killing := sets.NewString() // guard for the killing set lock := sync.Mutex{} for podPair := range kl.podKillingCh { runningPod := podPair.RunningPod apiPod := podPair.APIPod
lock.Lock() exists := killing.Has(string(runningPod.ID)) if !exists { killing.Insert(string(runningPod.ID)) } lock.Unlock()
if !exists { gofunc(apiPod *v1.Pod, runningPod *kubecontainer.Pod) { glog.V(2).Infof("Killing unwanted pod %q", runningPod.Name) err := kl.killPod(apiPod, runningPod, nil, nil) if err != nil { glog.Errorf("Failed killing the pod %q: %v", runningPod.Name, err) } lock.Lock() killing.Delete(string(runningPod.ID)) lock.Unlock() }(apiPod, runningPod) } } }
func(m *manager)Start() { // Don't start the status manager if we don't have a client. This will happen // on the master, where the kubelet is responsible for bootstrapping the pods // of the master components. if m.kubeClient == nil { glog.Infof("Kubernetes client is nil, not starting status manager.") return }
glog.Info("Starting to sync pod status with apiserver") syncTicker := time.Tick(syncPeriod) // syncPod and syncBatch share the same go routine to avoid sync races. go wait.Forever(func() { select { case syncRequest := <-m.podStatusChannel: glog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel", syncRequest.podUID, syncRequest.status.version, syncRequest.status.status) m.syncPod(syncRequest.podUID, syncRequest.status) case <-syncTicker: m.syncBatch() } }, 0) }
4.7. probeManager
处理容器探针
1
kl.probeManager.Start()
4.8. runtimeClassManager
1 2 3 4
// Start syncing RuntimeClasses if enabled. if kl.runtimeClassManager != nil { go kl.runtimeClassManager.Run(wait.NeverStop) }
4.9. PodLifecycleEventGenerator
1 2
// Start the pod lifecycle event generator. kl.pleg.Start()
PodLifecycleEventGenerator是一个pod生命周期时间生成器接口,具体如下:
1 2 3 4 5 6
// PodLifecycleEventGenerator contains functions for generating pod life cycle events. type PodLifecycleEventGenerator interface { Start() Watch() chan *PodLifecycleEvent Healthy() (bool, error) }
start方法具体实现如下:
1 2 3 4
// Start spawns a goroutine to relist periodically. func(g *GenericPLEG)Start() { go wait.Until(g.relist, g.relistPeriod, wait.NeverStop) }
// syncLoop is the main loop for processing changes. It watches for changes from // three channels (file, apiserver, and http) and creates a union of them. For // any new change seen, will run a sync against desired state and running state. If // no changes are seen to the configuration, will synchronize the last known desired // state every sync-frequency seconds. Never returns. func(kl *Kubelet)syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) { glog.Info("Starting kubelet main sync loop.") // The resyncTicker wakes up kubelet to checks if there are any pod workers // that need to be sync'd. A one-second period is sufficient because the // sync interval is defaulted to 10s. syncTicker := time.NewTicker(time.Second) defer syncTicker.Stop() housekeepingTicker := time.NewTicker(housekeepingPeriod) defer housekeepingTicker.Stop() plegCh := kl.pleg.Watch() const ( base = 100 * time.Millisecond max = 5 * time.Second factor = 2 ) duration := base for { if rs := kl.runtimeState.runtimeErrors(); len(rs) != 0 { glog.Infof("skipping pod synchronization - %v", rs) // exponential backoff time.Sleep(duration) duration = time.Duration(math.Min(float64(max), factor*float64(duration))) continue } // reset backoff if we have a success duration = base