// NewMainKubelet should have set up a pod source config if one didn't exist // when the builder was run. This is just a precaution. if kubeDeps.PodConfig == nil { return fmt.Errorf("failed to create kubelet, pod source config was nil") } podCfg := kubeDeps.PodConfig
funcstartKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableCAdvisorJSONEndpoints, enableServer bool) { // start the kubelet go k.Run(podCfg.Updates())
// start the kubelet server if enableServer { go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, enableCAdvisorJSONEndpoints, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)
} if kubeCfg.ReadOnlyPort > 0 { go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints) } if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) { go k.ListenAndServePodResources() } }
// Start volume manager go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
if kl.kubeClient != nil { // Start syncing node status immediately, this may set up things the runtime needs to run. go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop) go kl.fastStatusUpdateOnce()
// start syncing lease go kl.nodeLeaseController.Run(wait.NeverStop) } go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
// Set up iptables util rules if kl.makeIPTablesUtilChains { kl.initNetworkUtil() }
// Start a goroutine responsible for killing pods (that are not properly // handled by pod workers). go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)
// Start spawns a goroutine to relist periodically. func(g *GenericPLEG)Start() { go wait.Until(g.relist, g.relistPeriod, wait.NeverStop) } // relist queries the container runtime for list of pods/containers, compare // with the internal pods/containers, and generates events accordingly. func(g *GenericPLEG)relist() { klog.V(5).Infof("GenericPLEG: Relisting")
if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() { metrics.PLEGRelistInterval.Observe(metrics.SinceInSeconds(lastRelistTime)) }
// 1. podList, err := g.runtime.GetPods(true) if err != nil { klog.Errorf("GenericPLEG: Unable to retrieve pods: %v", err) return } g.updateRelistTime(timestamp) pods := kubecontainer.Pods(podList) // update running pod and container count updateRunningPodAndContainerMetrics(pods) g.podRecords.setCurrent(pods)
// 2. 比较新旧pods,并生成podLifecycleEvent eventsByPodID := map[types.UID][]*PodLifecycleEvent{} for pid := range g.podRecords { oldPod := g.podRecords.getOld(pid) pod := g.podRecords.getCurrent(pid) // Get all containers in the old and the new pod. allContainers := getContainersFromPods(oldPod, pod) for _, container := range allContainers { events := computeEvents(oldPod, pod, &container.ID) for _, e := range events { updateEvents(eventsByPodID, e) } } }
var needsReinspection map[types.UID]*kubecontainer.Pod if g.cacheEnabled() { needsReinspection = make(map[types.UID]*kubecontainer.Pod) }
// 3. for pid, events := range eventsByPodID { pod := g.podRecords.getCurrent(pid) if g.cacheEnabled() { if err := g.updateCache(pod, pid); err != nil { // Rely on updateCache calling GetPodStatus to log the actual error. klog.V(4).Infof("PLEG: Ignoring events for pod %s/%s: %v", pod.Name, pod.Namespace, err) needsReinspection[pid] = pod
continue } else { delete(g.podsToReinspect, pid) } } // Update the internal storage and send out the events. g.podRecords.update(pid) for i := range events { // Filter out events that are not reliable and no other components use yet. if events[i].Type == ContainerChanged { continue } select { case g.eventChannel <- events[i]: default: metrics.PLEGDiscardEvents.Inc() klog.Error("event channel is full, discard this relist() cycle event") } } }
if g.cacheEnabled() { // reinspect any pods that failed inspection during the previous relist iflen(g.podsToReinspect) > 0 { klog.V(5).Infof("GenericPLEG: Reinspecting pods that previously failed inspection") for pid, pod := range g.podsToReinspect { if err := g.updateCache(pod, pid); err != nil { // Rely on updateCache calling GetPodStatus to log the actual error. klog.V(5).Infof("PLEG: pod %s/%s failed reinspection: %v", pod.Name, pod.Namespace, err) needsReinspection[pid] = pod } } }
// Update the cache timestamp. This needs to happen *after* // all pods have been properly updated in the cache. g.cache.UpdateTime(timestamp) } // make sure we retain the list of pods that need reinspecting the next time relist is called g.podsToReinspect = needsReinspection }
for { if err := kl.runtimeState.runtimeErrors(); err != nil { klog.Errorf("skipping pod synchronization - %v", err) // exponential backoff time.Sleep(duration) duration = time.Duration(math.Min(float64(max), factor*float64(duration))) continue } // reset backoff if we have a success duration = base
kl.syncLoopMonitor.Store(kl.clock.Now()) if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) { break } kl.syncLoopMonitor.Store(kl.clock.Now()) } } // syncLoop核心逻辑: // 执行事件源: // 1. configCh 外部其他协程写入podUpdate对象 // 此时根据podUpdate事件Op类型来执行相应的handler: ADD,UPDATE,REMOVE,RECONCILE,DELETE // 2. plegCh被写入对象(pleg协程) // 调用 handler.sync,如果容器挂掉,则执行容器清理程序 // 3. 同步周期时间到 // 调用handler.sync // 4. 存活检查mng接收到事件 // 调用handler.sync // 5. housekeepingCh 周期到达 // func(kl *Kubelet)syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler, syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent)bool { select { case u, open := <-configCh: if !open { klog.Errorf("Update channel is closed. Exiting the sync loop.") returnfalse } switch u.Op { case kubetypes.ADD: handler.HandlePodAdditions(u.Pods) case kubetypes.UPDATE: handler.HandlePodUpdates(u.Pods) case kubetypes.REMOVE: handler.HandlePodRemoves(u.Pods) case kubetypes.RECONCILE: handler.HandlePodReconcile(u.Pods) case kubetypes.DELETE: // DELETE is treated as a UPDATE because of graceful deletion. handler.HandlePodUpdates(u.Pods) case kubetypes.RESTORE: // These are pods restored from the checkpoint. Treat them as new pods. handler.HandlePodAdditions(u.Pods) case kubetypes.SET: // TODO: Do we want to support this? klog.Errorf("Kubelet does not support snapshot update") } if u.Op != kubetypes.RESTORE { kl.sourcesReady.AddSource(u.Source) } case e := <-plegCh: if isSyncPodWorthy(e) { // PLEG event for a pod; sync it. if pod, ok := kl.podManager.GetPodByUID(e.ID); ok { handler.HandlePodSyncs([]*v1.Pod{pod}) } else { // If the pod no longer exists, ignore the event. } }
if e.Type == pleg.ContainerDied { if containerID, ok := e.Data.(string); ok { kl.cleanUpContainersInPod(e.ID, containerID) } } case <-syncCh: // Sync pods waiting for sync podsToSync := kl.getPodsToSync() iflen(podsToSync) == 0 { break } handler.HandlePodSyncs(podsToSync) case update := <-kl.livenessManager.Updates(): if update.Result == proberesults.Failure { pod, ok := kl.podManager.GetPodByUID(update.PodUID) if !ok { // If the pod no longer exists, ignore the update. break } handler.HandlePodSyncs([]*v1.Pod{pod}) } case <-housekeepingCh: if !kl.sourcesReady.AllReady() { } else { if err := handler.HandlePodCleanups(); err != nil { klog.Errorf("Failed cleaning pods: %v", err) } } } returntrue }