本文主要从kubelet启动过程入手分析。基于k8s 1.18.2

入口

主进程入口 -> cmd/kubelet/app/server.go run() -> RunKubelet(). RunKubelet()之前基本都是参数校验、初始化等。主要看开 RunKubelet()

重点关注两个部分:

  1. createAndInitKubelet() 该部分初始化Kubelet,并启动GC
  2. startKubelet() 该部分为真正启动Kubelet服务的调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {

// ...

// 初始化 Kubelet对象,并启动GC 协程
// GC协程包括两部分:containerGC、imageGC
k, err := createAndInitKubelet(/* ... */)
if err != nil {
return fmt.Errorf("failed to create kubelet: %v", err)
}

// NewMainKubelet should have set up a pod source config if one didn't exist
// when the builder was run. This is just a precaution.
if kubeDeps.PodConfig == nil {
return fmt.Errorf("failed to create kubelet, pod source config was nil")
}
podCfg := kubeDeps.PodConfig

rlimit.RlimitNumFiles(uint64(kubeServer.MaxOpenFiles))

// process pods and exit.
if runOnce {
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
return fmt.Errorf("runonce failed: %v", err)
}
klog.Info("Started kubelet as runonce")
} else {
startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer)
klog.Info("Started kubelet")
}
return nil
}

GC

createAndInitKubelet(/* ... */)方法中,初始化完Kubelet后,直接调用以下方法启动GC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (kl *Kubelet) StartGarbageCollection() {
loggedContainerGCFailure := false
go wait.Until(func() {
if err := kl.containerGC.GarbageCollect(); err != nil {
// 启动失败逻辑略...
} else {
// 启动成功逻辑略...
}
}, ContainerGCPeriod, wait.NeverStop)

// when the high threshold is set to 100, stub the image GC manager
if kl.kubeletConfiguration.ImageGCHighThresholdPercent == 100 {
return
}

prevImageGCFailed := false
go wait.Until(func() {
if err := kl.imageManager.GarbageCollect(); err != nil {
// 启动失败逻辑略...
} else {
// 启动成功逻辑略...
}
}, ImageGCPeriod, wait.NeverStop)
}

startKubelet

该函数重点关注内容:

  1. k.Run()
  2. k.ListenAndServe()
  3. k.ListenAndServePodResources()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableCAdvisorJSONEndpoints, enableServer bool) {
// start the kubelet
go k.Run(podCfg.Updates())

// start the kubelet server
if enableServer {
go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, enableCAdvisorJSONEndpoints, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)

}
if kubeCfg.ReadOnlyPort > 0 {
go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints)
}
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
go k.ListenAndServePodResources()
}
}

k.Run()

该部分核心内容:

  1. kl.cloudResourceSyncManager.Run(wait.NeverStop) //
  2. kl.initializeModules() // 初始化不需要容器运行时启动的内部模块,注意此处启动的模块不能依赖不在这里init的模块。这里主要初始化和内容包括:
    1. metrics注册
    2. 启动 imageManager,该方法内部启动两个协程:1. 定时刷新镜像使用时间 2. 定时更新本机镜像列表的缓存,做这些主要是为了给imageGC做依据
    3. 启动 certificate manager 源码注释: 证书管理器负责成为Kubelet中证书的权威来源,并处理由于轮换而导致的更新。
    4. 启动 oomWatcher oomWatcher接收由 Cadvisor 发送的OOM 事件,并且记录OOM事件(主要作用就是记录日志)
    5. 启动 resourceAnalyzer ra.fsResourceAnalyzer.Start() , 计算并缓存已知的每个Pod的PodVolumeStats 到kubelet
  3. kl.volumeManager.Run() ,启动volume管理器,主要包括:
    1. vm.desiredStateOfWorldPopulator ,源码注释:DesiredStateOfWorldPopulator定期循环查看活动pod的列表,并确保每个pod都存在于所需的world缓存状态(如果它有卷)。它还验证处于所需状态的pod是否仍然存在,如果不存在,则删除它们。
    2. vm.reconciler ,源码注释:Reconciler运行一个周期性循环,通过触发attach、detach、mount和unmount操作,将volume的期望状态与实际状态进行协调。
    3. vm.volumePluginMgr,csiDriver Informer,csiDriver类型资源的控制循环(实际无控制逻辑,启动后使本地可以使用csDriver缓存)
  4. wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop),同步节点状态,源码注释:如果最后一次同步有任何变化或经过足够的时间,它将节点状态同步到主节点,必要时首先注册kubelet。
  5. kl.nodeLeaseController.Run(wait.NeverStop)
  6. wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop) 源码注释:在容器运行时首次出现时初始化依赖于运行时的模块,如果状态检查失败,则返回错误。如果状态检查正常,请更新kubelet runtimeState中的容器运行时正常运行时间。
    6.1 kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules) 初始化运行时依赖模块:cadvisor、containerManager、evictionManager、containerLogManager、pluginManager(增加pluginHandler并启动)、
  7. kl.initNetworkUtil() , 监控iptables规则,确保网络实用程序在主机上存在,网络工具包括:
    1. NAT表中,KUBE-MARK-DROP 用于标记待删除的连接,标记的连接将放置在filter表的 INPUT/OUTPUT链上
    2. NAT表中,KUBE-MARK-MASQ 规则用于为SNAT标记连接,标记的SNAT连接将配置在nat表的POSTROUTING 链上
  8. wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop) 注释:如果pod未被运行,将被写入此处channel,podKiller启动goroutine来杀死从channel接收的pod。
  9. kl.statusManager.Start(),启动pod状态管理器:更新apiserver中pod的状态
  10. kl.probeManager.Start(),启动probe管理器
  11. kl.runtimeClassManager.Start(wait.NeverStop),启动runtimeClass管理器
  12. kl.pleg.Start() PodLifecycleEventGenerator,
  13. kl.syncLoop(updates, kl),
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {

// ...
// Start the cloud provider sync manager
if kl.cloudResourceSyncManager != nil {
go kl.cloudResourceSyncManager.Run(wait.NeverStop)
}

if err := kl.initializeModules(); err != nil {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
klog.Fatal(err)
}

// Start volume manager
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

if kl.kubeClient != nil {
// Start syncing node status immediately, this may set up things the runtime needs to run.
go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
go kl.fastStatusUpdateOnce()

// start syncing lease
go kl.nodeLeaseController.Run(wait.NeverStop)
}
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

// Set up iptables util rules
if kl.makeIPTablesUtilChains {
kl.initNetworkUtil()
}

// Start a goroutine responsible for killing pods (that are not properly
// handled by pod workers).
go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)

// Start component sync loops.
kl.statusManager.Start()
kl.probeManager.Start()

// Start syncing RuntimeClasses if enabled.
if kl.runtimeClassManager != nil {
kl.runtimeClassManager.Start(wait.NeverStop)
}

// Start the pod lifecycle event generator.
kl.pleg.Start()
kl.syncLoop(updates, kl)
}

kubelet.pleg.Start

PLEG (podLifeEventGenerator,在指定的时间间隔后列出pod/containers,对比他们并生成相应事件)核心逻辑:

  1. 通过 kubecontainer.Runtime接口获取pods(实时从底层容器服务查询的容器信息,经过封装变成简单的pod对象),此处pod非k8s资源中的Pod,此处pod数据简单,只包含一组container,没有Pod资源那么多属性。(rpc调用链[使用默认容器运行时docker]:runtime->dockershim->dockerd)
  2. 比较新旧pods,根据containerState是否发生变化来生成对应的event
  3. 遍历event,并将事件发送到eventChannel,中间夹杂着ReInspect的逻辑

最后,eventChannel 这个channel是在 kubelet.Run->kubelet.syncLoop 中被消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
// Start spawns a goroutine to relist periodically.
func (g *GenericPLEG) Start() {
go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
}
// relist queries the container runtime for list of pods/containers, compare
// with the internal pods/containers, and generates events accordingly.
func (g *GenericPLEG) relist() {
klog.V(5).Infof("GenericPLEG: Relisting")

if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() {
metrics.PLEGRelistInterval.Observe(metrics.SinceInSeconds(lastRelistTime))
}

timestamp := g.clock.Now()
defer func() {
metrics.PLEGRelistDuration.Observe(metrics.SinceInSeconds(timestamp))
}()

// 1.
podList, err := g.runtime.GetPods(true)
if err != nil {
klog.Errorf("GenericPLEG: Unable to retrieve pods: %v", err)
return
}

g.updateRelistTime(timestamp)

pods := kubecontainer.Pods(podList)
// update running pod and container count
updateRunningPodAndContainerMetrics(pods)
g.podRecords.setCurrent(pods)

// 2. 比较新旧pods,并生成podLifecycleEvent
eventsByPodID := map[types.UID][]*PodLifecycleEvent{}
for pid := range g.podRecords {
oldPod := g.podRecords.getOld(pid)
pod := g.podRecords.getCurrent(pid)
// Get all containers in the old and the new pod.
allContainers := getContainersFromPods(oldPod, pod)
for _, container := range allContainers {
events := computeEvents(oldPod, pod, &container.ID)
for _, e := range events {
updateEvents(eventsByPodID, e)
}
}
}

var needsReinspection map[types.UID]*kubecontainer.Pod
if g.cacheEnabled() {
needsReinspection = make(map[types.UID]*kubecontainer.Pod)
}

// 3.
for pid, events := range eventsByPodID {
pod := g.podRecords.getCurrent(pid)
if g.cacheEnabled() {

if err := g.updateCache(pod, pid); err != nil {
// Rely on updateCache calling GetPodStatus to log the actual error.
klog.V(4).Infof("PLEG: Ignoring events for pod %s/%s: %v", pod.Name, pod.Namespace, err)
needsReinspection[pid] = pod

continue
} else {
delete(g.podsToReinspect, pid)
}
}
// Update the internal storage and send out the events.
g.podRecords.update(pid)
for i := range events {
// Filter out events that are not reliable and no other components use yet.
if events[i].Type == ContainerChanged {
continue
}
select {
case g.eventChannel <- events[i]:
default:
metrics.PLEGDiscardEvents.Inc()
klog.Error("event channel is full, discard this relist() cycle event")
}
}
}

if g.cacheEnabled() {
// reinspect any pods that failed inspection during the previous relist
if len(g.podsToReinspect) > 0 {
klog.V(5).Infof("GenericPLEG: Reinspecting pods that previously failed inspection")
for pid, pod := range g.podsToReinspect {
if err := g.updateCache(pod, pid); err != nil {
// Rely on updateCache calling GetPodStatus to log the actual error.
klog.V(5).Infof("PLEG: pod %s/%s failed reinspection: %v", pod.Name, pod.Namespace, err)
needsReinspection[pid] = pod
}
}
}

// Update the cache timestamp. This needs to happen *after*
// all pods have been properly updated in the cache.
g.cache.UpdateTime(timestamp)
}
// make sure we retain the list of pods that need reinspecting the next time relist is called
g.podsToReinspect = needsReinspection
}

kubelet.syncLoop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
// syncLoop是处理pod变化的主循环,它主要从3个地方接收pod变化事件(file,apiserver,http),对于任一变化事件,该循环将同步当前状态与期望状态,无事件则按照设置的周期来定期同步。
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
klog.Info("Starting kubelet main sync loop.")
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()
housekeepingTicker := time.NewTicker(housekeepingPeriod)
defer housekeepingTicker.Stop()
plegCh := kl.pleg.Watch()
const (
base = 100 * time.Millisecond
max = 5 * time.Second
factor = 2
)
duration := base
if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
kl.dnsConfigurer.CheckLimitsForResolvConf()
}

for {
if err := kl.runtimeState.runtimeErrors(); err != nil {
klog.Errorf("skipping pod synchronization - %v", err)
// exponential backoff
time.Sleep(duration)
duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
continue
}
// reset backoff if we have a success
duration = base

kl.syncLoopMonitor.Store(kl.clock.Now())
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}
}
// syncLoop核心逻辑:
// 执行事件源:
// 1. configCh 外部其他协程写入podUpdate对象
// 此时根据podUpdate事件Op类型来执行相应的handler: ADD,UPDATE,REMOVE,RECONCILE,DELETE
// 2. plegCh被写入对象(pleg协程)
// 调用 handler.sync,如果容器挂掉,则执行容器清理程序
// 3. 同步周期时间到
// 调用handler.sync
// 4. 存活检查mng接收到事件
// 调用handler.sync
// 5. housekeepingCh 周期到达
//
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
if !open {
klog.Errorf("Update channel is closed. Exiting the sync loop.")
return false
}
switch u.Op {
case kubetypes.ADD:
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.RESTORE:
// These are pods restored from the checkpoint. Treat them as new pods.
handler.HandlePodAdditions(u.Pods)
case kubetypes.SET:
// TODO: Do we want to support this?
klog.Errorf("Kubelet does not support snapshot update")
}
if u.Op != kubetypes.RESTORE {
kl.sourcesReady.AddSource(u.Source)
}
case e := <-plegCh:
if isSyncPodWorthy(e) {
// PLEG event for a pod; sync it.
if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
handler.HandlePodSyncs([]*v1.Pod{pod})
} else {
// If the pod no longer exists, ignore the event.
}
}

if e.Type == pleg.ContainerDied {
if containerID, ok := e.Data.(string); ok {
kl.cleanUpContainersInPod(e.ID, containerID)
}
}
case <-syncCh:
// Sync pods waiting for sync
podsToSync := kl.getPodsToSync()
if len(podsToSync) == 0 {
break
}
handler.HandlePodSyncs(podsToSync)
case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure {
pod, ok := kl.podManager.GetPodByUID(update.PodUID)
if !ok {
// If the pod no longer exists, ignore the update.
break
}
handler.HandlePodSyncs([]*v1.Pod{pod})
}
case <-housekeepingCh:
if !kl.sourcesReady.AllReady() {
} else {
if err := handler.HandlePodCleanups(); err != nil {
klog.Errorf("Failed cleaning pods: %v", err)
}
}
}
return true
}