k8s源码阅读00——client-go之-controller(+简单实现)
/ / 点击本篇文章简单介绍controller实现过程中涉及到的client-go中的几个组件,并尝试使用简单的方式手撸一个可以实现一个类似client-go中
controller功能的demo。注:本文client-go是从
kubernetes 1.13的项目源码中拷贝出来的,文中标记的代码行数以此为准。
简介
NewIndexerInformer()是定义在
client-go/tools/cache/shared_informer中的一个函数,返回值为indexer,controller,本文主要讲controller的功能实现(也会简单提到indexer),它用来监听k8s集群中的某一种资源,针对资源对象的不同事件(add/update/delete)执行用户自定义的事件处理函数。其在k8s中的应用十分广泛
- k8s里的系统组件:k8s调度器里通过监听pod资源对象来对其进行调度、k8s的kube-proxy通过监听service/endpoints资源对象的变化来配置各个节点的网络等等;
CRD- 使用该组件对k8s中的资源进行状态监控(deployment、daemonSet、pod等)
- 使用该组件对k8s中的资源配额进行监控
- 其他
controller 基本组成部分介绍
先讲几个在
controller中引用到的几个结构体
Indexer
首先看一下如何
New一个Indexer对象:
1 |
|
可以看到,
NewIndexer返回的是一个cache对象,而cache实现了Indexer接口。
再结合cache的定义可以知道,此处的Indexer就是一个线程安全的存储,keyFunc的作用就是给定一个对象,然后返回该对象的key值。暂时简单地理解Indexer不去深究其内部其他逻辑,将其视为一个线程安全的map存储即可,这个map的key值可以通过调用keyFunc(val)获得。
ListWatch
首先看下 ListWatch的定义:
1 | // ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface. |
主要包括两个函数对象(先不去理会DisableChunking),
ListFunc作用主要是列出k8s集群中的资源对象,WatchFunc作用主要是监听k8s集群中的资源对象。先看下创建
ListWatch的代码吧:
1 | func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch { |
从代码中可用看到,这里创建了两个函数对象,ListFunc和WatchFunc,这两个函数返回的都是向apiserver发请求的对象,不过其发送请求的
Action不同,一个是Get,一个是Watch,那ListWatch对象如何知道去Get/Watch哪个集群的资源对象呢,这个就是通过参数c来指定了,k8s的ClientSet实现了Getter接口,可以作为参数传进来。到此处是不是感觉接触到了controller的核心部分了呢?到这里就算是比较接近底层的部分了,再向底层分析就是client-go向apiserver发请求和解析请求结果的部分了,深度上先到此为止
(在本篇中,我们不去关心更底层是如何Watch到资源的变化情况的,也不关心数据是怎么在client和apiserver中传输和解析的
只需知道ListWatch已经能够Get和Watch到集群的资源就够了)。
DeltaFIFO
DeltaFIFO,从名字可以看出这是一个变化的队列,且是先进先出的队列。稍微解释一下,这个队列实际包含两层队列。先看下定义的这部分:
1 | // client-go/tools/cache/delta_fifo.go 96行 |
先介绍一下 keyFunc,keyFunc和上边的Indexer里的keyFunc作用类似,都是根据对象返回一个key值,这里求得的key值主要用于 queue中的值,还有items这个map中的键值,从而,当从queue中取出队列头部元素时,可以根据这个元素值从items中取出对应的具体value
以下分析DeltaFIFO的两层队列实现:
首先,queue是一个队列,数组保存队列,也很容易实现先进先出,queue队列中元素出队时,从item中取出items[queue[0]]元素并删除,queue也删除0号元素即可完成队列的Pop操作。从Deltas的定义看到,这是一个Delta数组,items[queue[0]]也是一个队列,这个队列存的是Delta对象,既然它们存储在同一个items[key]下,那么表示它们是同一个资源对象的Delta,即:这个队列表示的是同一个对象的事件队列。综上: queue表示的是不同资源对象的队列,items的value值表示的是同一资源对象的事件队列。
有点拗口,想象一下这个场景: 假设有3个pod资源对象,记为 a,b,c,
- a发生变化deltaA1,此时a加入到queue队列,a的变化事件加到items[a]这个事件队列
- b发生变化deltaB1,此时b假如到queue队列,b的变化事件加到items[b]这个事件队列
- a又发生变化deltaA2,此时items中已经存在items[a]了,表示a已经在队列queue中了,此时,获取到items[a]事件队列,然后将这次变化加入队列
此时,queue和items中的数据为:
1 | queue : [a,b] |
如果上边步骤
3发生时,queue已经执行过一次出队操作,那么:
a发生变化deltaA2时,items中items[a]不存在,表示a不在队列中,便将a加到queue队列,deltaA2加到items[a]这个事件队列中去。此时,queue和items中的数据为:
1 | queue: [b,a] |
讲了这么多,大概应该能明白DeltaFIFO的作用了吧,简单点说就是存储k8s资源对象变化事件的队列。
controller
controller初始化
在controller里,存储了ListWatch,objType(监听的资源对象类型),ResourceEventHandler(资源事件handler),DeltaFIFO,以及Process
代码如下:
1 | // client-go/tools/cache/controller.go line:345 |
controller运行
controller启动方法是 controller.Run(),首先看下源码:
1 | // client-go/tools/cache/controller.go line:100 |
通过 controller.Run方法,整个资源状态监控的过程就完成了。
总结:
通过以上讲informer中使用到的各个组件的功能及作用,整个informer工作流程大概如下:
- 首先调用ListWatch中的List方法,初步将k8s中待监听资源拉到本地,将其加到本地存储Indexer和事件队列DeltaFIFO中(初始化时将这些资源的变化事件看做是ADD)
- 异步1:使用ListWatch中Watch方法不断去监听事件,监听到后将其加到 DeltaFIFO中
- 异步1:无限循环:
- 阻塞方法去从队列中取出数据(使用到了sync.cond)
- 取到的数据类型是 Deltas,调用 controller中的Process方法(上边的NewIndexerInformer里定义的)
- // 在Process方法里调用用户自定义的handler
以上过程讲的比较简单,实际client-go的代码中很多出错逻辑处理、同步数据、以及多协程时的数据存储优化。
手撸实现
client-go中,controller 部分代码使用了很多接口,而且封装的太深,看它的源码时没那么容易。
基于以上对 controller 的分析,自己实现了一个比较简单的 controller,弄懂这个后再去深究 client-go的实现会事半功倍。
代码地址:https://github.com/geedchin/client-go-src-learning
核心部分在 02_watch中