k8s源码阅读03——controller-manager 之 deploymentController
/ / 点击控制器模型
在K8S中,控制器的代码都遵循了一个通用的编排模式:控制循环。例如,在一个待编排的对象R,它有一个对应的控制器,那么,可以用以下伪代码来描述这个控制循环:
1 | for{ |
实际上,deployment控制replicaSet资源对象,然后replicaSet资源对象再来控制pod对象,从而实现了deployment对Pod对象的管理。在deploymentController的源码中也有对replicaSet和pod的控制循环,因为这个过程中涉及到对这两个对象的修改或者依赖于这两个对象修改时要某些处理。

如上图,deployment直接控制对象是replicaSet,replicaSet来控制Pod。另外,在deploymentController中,有deploymentInformor、replicaSetInformor、podInformor三个用于监听这三种对象的变化情况,然后再将变化事件推给deploymentController控制循环,从而完成整个控制逻辑。
源码分析
基于K8S 1.16.2源码
控制启动入口
Controller-manager整个监控逻辑启动入口是 kubernetes/cmd/kube-controller-manager/controller-manager.go中。
deploymentController控制循环启动:
- 执行main函数,
cmd.Execute()内部最终会调用到cmd.Run方法(cmd/kube-controller-manager/app/controllermanager.go 108行) - 该方法中,最后再执行
Run方法,进入到 159行 - 该方法中246行执行
run方法,run方法在上边定义 - 在
run方法中,234行调用NewControllerInitializers()方法 373行 - 在
NewControllerInitializers方法中,注册了一系列controller的入口函数,关注 385行,controllers["deployment"] = startDeploymentController - 跳转到
startDeploymentController定义(cmd/kube-controller-manager/app/apps.go 86行),找到NewDeploymentController,通过此方法即可找到deploymentController的核心源码
deploymentController中主要有3个Informor控制循环来共同协作来实现功能,接下来从上到下挨个分析这三个控制循环的源码
控制循环
deployment如何到达控制循环中
在创建
deploymentController变量的时候,函数里定义了deployment变化事件Handler,在handler中,最终都会把deployment对象放入 dc.queue队列中,然后在 dc.processNextWorkItem()方法中对其消费,在该函数中调用了dc.syncHandler方法,该方法是在创建deploymentController对象的时候赋值的,执行体为dc.syncDeployment。
1 | // pkg/controller/deployment/deployment_controller.go 459行 |
即:只要将deployment对象放到dc.queue队列中,dc.processNextWorkItem方法便会从队列中取出数据,然后进入控制循环,接下来先讲一下,deployment对象入队的场景。
deploymentInformer
deploymentInformer 里,监控到deployment发生变化,直接进入控制循环,这点没有什么问题,因为deploymentController本身就是要来管理deployment的。以下代码为deploymentInformer变化事件handler的初始化。其中
dc.addDeployment、dc.updateDeployment、dc.deleteDeployment函数里的逻辑都比较简单,基本就是一些错误处理,最后把deployment 入队
1 | func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) { |
replicasetInformer
从上边控制器模型的图中可以看到,replicasetInformer有3种事件都反馈到deploymentController中,实际在replicasetInformer中,replicaSet的3种变化都会将其对应的deployment对象入队,使其进入控制循环。首先看下,informer中定义的事件handler:
1
2
3
4
5
6
7
8
9 func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
// ... /pkg/controller/deployment/deployment_controller.go 126行
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addReplicaSet,
UpdateFunc: dc.updateReplicaSet,
DeleteFunc: dc.deleteReplicaSet,
})
// ...
}分别看下三种事件中的处理逻辑
add
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 func (dc *DeploymentController) addReplicaSet(obj interface{}) {
rs := obj.(*apps.ReplicaSet)
// ① 首先判断是否是已删除的rs,如果是,则调用 dc.deleteReplicaSet函数
// dc.deleteReplicaSet:如果找不到对应的管理这个rs 的deployment对象,则直接返回,否则将deployment入队,让其进入控制循环
if rs.DeletionTimestamp != nil {
// On a restart of the controller manager, it's possible for an object to
// show up in a state that is already pending deletion.
dc.deleteReplicaSet(rs)
return
}
// ② 如果rs有对应的deployment,则将其入队并退出,否则继续
// If it has a ControllerRef, that's all that matters.
if controllerRef := metav1.GetControllerOf(rs); controllerRef != nil {
d := dc.resolveControllerRef(rs.Namespace, controllerRef)
if d == nil {
return
}
klog.V(4).Infof("ReplicaSet %s added.", rs.Name)
dc.enqueueDeployment(d)
return
}
// ③ 孤儿rs,根据rs的label找到与其相同的deployment,如果没有则直接退出,否则继续
// Otherwise, it's an orphan. Get a list of all matching Deployments and sync
// them to see if anyone wants to adopt it.
ds := dc.getDeploymentsForReplicaSet(rs)
if len(ds) == 0 {
return
}
// ④ 将上一步找到的所有deployment都入队,交由deployment控制循环处理
klog.V(4).Infof("Orphan ReplicaSet %s added.", rs.Name)
for _, d := range ds {
dc.enqueueDeployment(d)
}
}
update
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45 func (dc *DeploymentController) updateReplicaSet(old, cur interface{}) {
curRS := cur.(*apps.ReplicaSet)
oldRS := old.(*apps.ReplicaSet)
if curRS.ResourceVersion == oldRS.ResourceVersion {
// Periodic resync will send update events for all known replica sets.
// Two different versions of the same replica set will always have different RVs.
return
}
// ① 更新前后,replicaset的控制者如果发生变化,而且更新前的控制在不为空,且找到旧的控制者,则将旧rs的控制者入队
// 这么做,是为了让旧rs的控制者知道rs脱离了控制,deployment会根据情况去判断是否再创建新的rs
curControllerRef := metav1.GetControllerOf(curRS)
oldControllerRef := metav1.GetControllerOf(oldRS)
controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
if controllerRefChanged && oldControllerRef != nil {
// The ControllerRef was changed. Sync the old controller, if any.
if d := dc.resolveControllerRef(oldRS.Namespace, oldControllerRef); d != nil {
dc.enqueueDeployment(d)
}
}
// ② 如果新rs的控制者不为空,则去找其控制者,找不到,则直接退出,找到则将其控制者入队
// If it has a ControllerRef, that's all that matters.
if curControllerRef != nil {
d := dc.resolveControllerRef(curRS.Namespace, curControllerRef)
if d == nil {
return
}
klog.V(4).Infof("ReplicaSet %s updated.", curRS.Name)
dc.enqueueDeployment(d)
return
}
// ③ 否则,它是一个孤儿rs,同步能够与其匹配的deployment,看是否哪一个现在想管理它
// Otherwise, it's an orphan. If anything changed, sync matching controllers
// to see if anyone wants to adopt it now.
labelChanged := !reflect.DeepEqual(curRS.Labels, oldRS.Labels)
if labelChanged || controllerRefChanged {
ds := dc.getDeploymentsForReplicaSet(curRS)
if len(ds) == 0 {
return
}
klog.V(4).Infof("Orphan ReplicaSet %s updated.", curRS.Name)
for _, d := range ds {
dc.enqueueDeployment(d)
}
}
}
delete
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 func (dc *DeploymentController) deleteReplicaSet(obj interface{}) {
rs, ok := obj.(*apps.ReplicaSet)
// ① obj 对象可能是apps.ReplicaSet 或 cache.DeletedFinalStateUnknown,这里先进行不是apps.ReplicaSet时的处理。
// 如果不是,则从 cache.DeletedFinalStateUnknown中取出rs对象,失败则返回,并报错,成功则继续
// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
// the deleted key/value. Note that this value might be stale. If the ReplicaSet
// changed labels the new deployment will not be woken up till the periodic resync.
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
return
}
rs, ok = tombstone.Obj.(*apps.ReplicaSet)
if !ok {
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a ReplicaSet %#v", obj))
return
}
}
// ② 获取rs的控制者,如果等于nil,则直接返回
// 可能原因:手动创建的rs,而不是通过deployment创建的rs
controllerRef := metav1.GetControllerOf(rs)
if controllerRef == nil {
// No controller should care about orphans being deleted.
return
}
// ③ 根据rsRef找到对应的控制者对象,如果为空,则直接退出,如果不为空,则将其入队
// 为空的原因:deployment删除时PropagationPolicy参数设置成background时,rs本身是记录了控制者信息,但是deployment被提前删除了。所以根据Ref信息获取不到存在的deployment
d := dc.resolveControllerRef(rs.Namespace, controllerRef)
if d == nil {
return
}
klog.V(4).Infof("ReplicaSet %s deleted.", rs.Name)
dc.enqueueDeployment(d)
}
PodInformer
1
2
3
4
5
6
7
8 func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
// ...
// pkg/controller/deployment/deployment_controller.go 131行
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: dc.deletePod,
})
// ...
}PodInformer中只有delete事件会通知到deployment,下边看下 dc.deletePod具体的逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 func (dc *DeploymentController) deletePod(obj interface{}) {
pod, ok := obj.(*v1.Pod)
// ① 同rs删除事件函数中的开始处理,obj对象可能是v1.Pod或DeletedFinalStateUnknown,如果是后者,则取出v1.Pod并继续
// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
// the deleted key/value. Note that this value might be stale. If the Pod
// changed labels the new deployment will not be woken up till the periodic resync.
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
return
}
pod, ok = tombstone.Obj.(*v1.Pod)
if !ok {
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v", obj))
return
}
}
klog.V(4).Infof("Pod %s deleted.", pod.Name)
// ② 根据pod找到对应的deployment,如果找到,而且 deployment的更新策略为Recreate,则进入if处理逻辑,否则结束
// 此处判断deployment更新策略是因为,只有当更新测量是Recreate时,deployment才会受pod的删除影响。
if d := dc.getDeploymentForPod(pod); d != nil && d.Spec.Strategy.Type == apps.RecreateDeploymentStrategyType {
// Sync if this Deployment now has no more Pods.
// 根据 deployment获取其控制的rs,和每个rs对应的pod(podMap存储)
rsList, err := util.ListReplicaSets(d, util.RsListFromClient(dc.client.AppsV1()))
if err != nil {
return
}
podMap, err := dc.getPodMapForDeployment(d, rsList)
if err != nil {
return
}
numPods := 0
// 累加所有rs的pod数,如果等于0,则将deployment入队
// Recreate更新策略:将所有pod都删除才创建新的Rs
for _, podList := range podMap {
numPods += len(podList)
}
if numPods == 0 {
dc.enqueueDeployment(d)
}
}
}
控制循环
1 | // pkg/controller/deployment/deployment_controller.go 562行 |
在源码中标记了 1-6个编号,重点关注这几处逻辑
①判断deployment选择器是否为空
deployment选择器如果为空,则其默认为选中所有,这在k8s中是不被允许的,所以做了短路处理,如果选择器为空,则直接return,不进行接下来的处理。
②判断删除标记
判断删除时间戳是否为空,如果不为空,说明该deployment已被删除,则不做处理,只更新状态调用到
dc.syncStatusOnly,重新计算deployment.status值,然后调用api执行更新操作。
③判断是否暂停
暂停操作相关命令:
1
2
3
4 暂停
kubectl rollout pause -n NAMESPACE deploy DEPLOYNAME
恢复
kubectl rollout resume -n NAMESPACE deploy DEPLOYNAME在暂停操作时,deploymentController是不对rs进行操作的,只有当恢复暂停时,才会进行操作。主要应用场景,多次修改deployment时,希望多次修改操作能够一次性提交更新,而不是每次保存配置时都更新。
回到源码,该部分逻辑很简单,如果判断结果是deployment处于暂停状态,那么调用到
dc.sync方法,该方法和scale时调用的方法是同一个,该方法里只会执行scale 和 更新deployment操作
④判断是否是rollback
该方法即将被移除掉。。
⑤判断是否是扩缩容引起的deployment变化
通过对比
deployment.template和newRs.template相等而且副本数不同来做扩缩容,那么直接调用dc.sync方法,处理扩缩容,否则,则继续
⑥根据deployment的更新策略调用不同的逻辑
这里主要看下 RollingUpdate的情况
此时,调用方法
dc.rolloutRolling,该方法源码如下,已加入详细注释
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40 // pkg/controller/deployment/rolling.go 31行
func (dc *DeploymentController) rolloutRolling(d *apps.Deployment, rsList []*apps.ReplicaSet) error {
// getAllReplicaSetsAndSyncRevision,该方法中,实现了很多deployment的管理rs逻辑
// 1. 版本回滚,版本回滚的时候,deployment.template被修改为指定版本的值,newRs其实是对比历史版本rs后,取rs.template与deployment.template相等的rs(同时会将rs初始化掉部分字段置空或按规则生成新值、修改版本号等)
// 2. 新建rs,调用方法第三个参数为,如果不存在newRs是否创建,此处传值为true,那么新建deployment的时候,里边的逻辑会去创建新rs
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
if err != nil {
return err
}
allRSs := append(oldRSs, newRS)
// 判断新Rs是否需要增加、减少副本数
// deployment控制pod的副本数、滚动更新 就是通过此处修改 rs的副本数间接管理,然后由rs去控制pod数量
scaledUp, err := dc.reconcileNewReplicaSet(allRSs, newRS, d)
if err != nil {
return err
}
if scaledUp {
// Update DeploymentStatus
return dc.syncRolloutStatus(allRSs, newRS, d)
}
// 判断旧Rs是否需要增加、减少副本数
// 滚动更新的时候有一定的策略,一边减少旧版本副本(下边代码)、一边增加新版本副本(上边)
scaledDown, err := dc.reconcileOldReplicaSets(allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
if err != nil {
return err
}
if scaledDown {
// Update DeploymentStatus
return dc.syncRolloutStatus(allRSs, newRS, d)
}
// 判断deployment 是否达到稳定状态(1.deployment实际状态与期望状态完全一致并可用,2.所有旧Pod都处于非Running状态)
// 如果稳定,那么执行clean逻辑,主要是清理历史replicaSet(如果配置了deployment.spec.RevisionHistoryLimit)
if deploymentutil.DeploymentComplete(d, &d.Status) {
if err := dc.cleanupDeployment(oldRSs, d); err != nil {
return err
}
}
// Sync deployment status
return dc.syncRolloutStatus(allRSs, newRS, d)
}