本篇文章简单介绍controller实现过程中涉及到的client-go中的几个组件,并尝试使用简单的方式手撸一个可以实现一个类似client-go中controller功能的demo。

注:本文client-go是从kubernetes 1.13的项目源码中拷贝出来的,文中标记的代码行数以此为准。

简介

NewIndexerInformer()是定义在client-go/tools/cache/shared_informer中的一个函数,返回值为indexer,controller,本文主要讲controller的功能实现(也会简单提到indexer),它用来监听k8s集群中的某一种资源,针对资源对象的不同事件(add/update/delete)执行用户自定义的事件处理函数。

其在k8s中的应用十分广泛

  1. k8s里的系统组件:k8s调度器里通过监听pod资源对象来对其进行调度、k8s的kube-proxy通过监听service/endpoints资源对象的变化来配置各个节点的网络等等;
  2. CRD
  3. 使用该组件对k8s中的资源进行状态监控(deployment、daemonSet、pod等)
  4. 使用该组件对k8s中的资源配额进行监控
  5. 其他

controller 基本组成部分介绍

先讲几个在controller中引用到的几个结构体

Indexer

首先看一下如何New一个Indexer对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

// client-go/tools/cache/store.go line: 112
type cache struct {
// cacheStorage bears the burden of thread safety for the cache
cacheStorage ThreadSafeStore
// keyFunc is used to make the key for objects stored in and retrieved from items, and
// should be deterministic.
keyFunc KeyFunc
}
// ...

// client-go/tools/cache/store.go line: 239
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
}
}
// 注KeyFunc定义: type KeyFunc func(obj interface{}) (string, error)

可以看到,NewIndexer返回的是一个cache对象,而cache实现了Indexer接口。
再结合cache的定义可以知道,此处的Indexer就是一个线程安全的存储,keyFunc的作用就是给定一个对象,然后返回该对象的key值。暂时简单地理解Indexer不去深究其内部其他逻辑,将其视为一个线程安全的map存储即可,这个mapkey值可以通过调用keyFunc(val)获得。

ListWatch

首先看下 ListWatch的定义:

1
2
3
4
5
6
7
8
9
// ListWatch knows how to list and watch a set of apiserver resources.  It satisfies the ListerWatcher interface.
// It is a convenience function for users of NewReflector, etc.
// ListFunc and WatchFunc must not be nil
type ListWatch struct {
ListFunc ListFunc
WatchFunc WatchFunc
// DisableChunking requests no chunking for this list watcher.
DisableChunking bool
}

主要包括两个函数对象(先不去理会DisableChunking),ListFunc作用主要是列出k8s集群中的资源对象,WatchFunc作用主要是监听k8s集群中的资源对象。

先看下创建ListWatch的代码吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Do().
Get()
}
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
options.Watch = true
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Watch()
}
return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}

从代码中可用看到,这里创建了两个函数对象,ListFunc和WatchFunc,这两个函数返回的都是向apiserver发请求的对象,不过其发送请求的Action不同,一个是Get,一个是Watch,那ListWatch对象如何知道去Get/Watch哪个集群的资源对象呢,这个就是通过参数c来指定了,k8s的ClientSet实现了Getter接口,可以作为参数传进来。

到此处是不是感觉接触到了controller的核心部分了呢?到这里就算是比较接近底层的部分了,再向底层分析就是client-go向apiserver发请求和解析请求结果的部分了,深度上先到此为止
(在本篇中,我们不去关心更底层是如何Watch到资源的变化情况的,也不关心数据是怎么在client和apiserver中传输和解析的
只需知道ListWatch已经能够GetWatch到集群的资源就够了)。

DeltaFIFO

DeltaFIFO,从名字可以看出这是一个变化的队列,且是先进先出的队列。稍微解释一下,这个队列实际包含两层队列。先看下定义的这部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// client-go/tools/cache/delta_fifo.go  96行
type DeltaFIFO struct {
//...
// line:104
items map[string]Deltas
queue []string
// ...
keyFunc KeyFunc
// ...
}
// 补充一下Deltas的定义:
type Deltas []Delta
type Delta struct {
Type DeltaType // 字符串,Added,Updated,Deleted等
Object interface{}
}

先介绍一下 keyFunc,keyFunc和上边的Indexer里的keyFunc作用类似,都是根据对象返回一个key值,这里求得的key值主要用于 queue中的值,还有items这个map中的键值,从而,当从queue中取出队列头部元素时,可以根据这个元素值从items中取出对应的具体value

以下分析DeltaFIFO的两层队列实现:
首先,queue是一个队列,数组保存队列,也很容易实现先进先出,queue队列中元素出队时,从item中取出items[queue[0]]元素并删除,queue也删除0号元素即可完成队列的Pop操作。从Deltas的定义看到,这是一个Delta数组,items[queue[0]]也是一个队列,这个队列存的是Delta对象,既然它们存储在同一个items[key]下,那么表示它们是同一个资源对象的Delta,即:这个队列表示的是同一个对象的事件队列。

综上: queue表示的是不同资源对象的队列,items的value值表示的是同一资源对象的事件队列。

有点拗口,想象一下这个场景: 假设有3个pod资源对象,记为 a,b,c,

  1. a发生变化deltaA1,此时a加入到queue队列,a的变化事件加到items[a]这个事件队列
  2. b发生变化deltaB1,此时b假如到queue队列,b的变化事件加到items[b]这个事件队列
  3. a又发生变化deltaA2,此时items中已经存在items[a]了,表示a已经在队列queue中了,此时,获取到items[a]事件队列,然后将这次变化加入队列
    此时,queue和items中的数据为:
1
2
queue : [a,b]
items : {a:[deltaA1,deltaA2], b: [deltaB1]}

如果上边步骤3发生时,queue已经执行过一次出队操作,那么:
a发生变化deltaA2时,items中items[a]不存在,表示a不在队列中,便将a加到queue队列,deltaA2加到items[a]这个事件队列中去。此时,queue和items中的数据为:

1
2
3
queue: [b,a]
items: {a:[deltaA2], b: [deltaB1]}
// 注意:items是map,其内部的`k-v`键值对不分先后顺序,只有v的值分先后顺序,因为v是数组

讲了这么多,大概应该能明白DeltaFIFO的作用了吧,简单点说就是存储k8s资源对象变化事件的队列。

controller

controller初始化

在controller里,存储了ListWatch,objType(监听的资源对象类型),ResourceEventHandler(资源事件handler),DeltaFIFO,以及Process

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// client-go/tools/cache/controller.go line:345
func NewIndexerInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
indexers Indexers,
) (Indexer, Controller) {
// This will hold the client state, as we know it.
// 定义Indexer(线程安全的存储)
clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)

// This will hold incoming changes. Note how we pass clientState in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
// 定义FIFO(事件队列)
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState)

cfg := &Config{
Queue: fifo,
// 将ListWatch对象赋值到cfg,传递给controller
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,

// 资源对象变化处理函数,obj类型是Deltas,就是上边降到DeltaFIFO中 items这个map对象的value的数据类型
// 这个函数是在 fifo.queue这个队列出队的时候调用
Process: func(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
// 判断Indexer中是否有该元素,有则表示是更新操作,没有表示是新增操作
// 该判断逻辑一定对的前提:下边Add事件时,Indexer同步add,Delete事件时,同步delete
// 此时更新Indexer中的数据,并调用 h.OnUpdate
// Indexer中存储数据的更重要的一点是,本地存储新更新到来之前的最新版本资源对象,那么当更新事件到来时,客户端可以有新旧对象供handler使用
if old, exists, err := clientState.Get(d.Object); err == nil && exists {
// update Indexer
if err := clientState.Update(d.Object); err != nil {
return err
}
// 调用handler事件
h.OnUpdate(old, d.Object)
} else {
// add to Indexer
if err := clientState.Add(d.Object); err != nil {
return err
}
// 调用handler事件
h.OnAdd(d.Object)
}
case Deleted:
// delete from Indexer
if err := clientState.Delete(d.Object); err != nil {
return err
}
// 调用handler事件
h.OnDelete(d.Object)
}
}
return nil
},
}
return clientState, New(cfg)
}

controller运行

controller启动方法是 controller.Run(),首先看下源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// client-go/tools/cache/controller.go line:100
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock

c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()

var wg wait.Group
defer wg.Wait()
// 此处启动一个协程,运行 r.Run函数
// r.Run即Reflector对象的Run方法,定义在 client-go/tools/cache/reflector.go line:121
// 通过查看 r.Run可以知道,它又运行了 r.ListAndWatch
// r.ListAndWatch里的逻辑就是真正获取k8s资源对象
// 在该函数中又调用r.watchHandler 来监控着资源的事件,并将其加到r.store中(在本文描述场景中,r.store即为DeltaFIFO)
wg.StartWithChannel(stopCh, r.Run)

// 此处启动processLoop循环处理事件
wait.Until(c.processLoop, time.Second, stopCh)
}

// 从队列(DeltaFIFO)中取出数据,执行 process(前边在NewIndexerInformer中定义的process)
// c.config.Queue.Pop方法定义在:client-go/tools/cache/delta_fifo.go line:411
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == FIFOClosedError {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}

通过 controller.Run方法,整个资源状态监控的过程就完成了。

总结:

通过以上讲informer中使用到的各个组件的功能及作用,整个informer工作流程大概如下:

  1. 首先调用ListWatch中的List方法,初步将k8s中待监听资源拉到本地,将其加到本地存储Indexer和事件队列DeltaFIFO中(初始化时将这些资源的变化事件看做是ADD)
  2. 异步1:使用ListWatch中Watch方法不断去监听事件,监听到后将其加到 DeltaFIFO中
  3. 异步1:无限循环:
    1. 阻塞方法去从队列中取出数据(使用到了sync.cond)
    2. 取到的数据类型是 Deltas,调用 controller中的Process方法(上边的NewIndexerInformer里定义的)
    3. // 在Process方法里调用用户自定义的handler

以上过程讲的比较简单,实际client-go的代码中很多出错逻辑处理、同步数据、以及多协程时的数据存储优化。

手撸实现

client-go中,controller 部分代码使用了很多接口,而且封装的太深,看它的源码时没那么容易。

基于以上对 controller 的分析,自己实现了一个比较简单的 controller,弄懂这个后再去深究 client-go的实现会事半功倍。

代码地址:https://github.com/geedchin/client-go-src-learning
核心部分在 02_watch中