调度器概述

K8S调度器主要负责将Pod调度到某Node上运行。可以把调度器看做一个黑盒,输入为:pod,nodeList,输出为node,即给定pod和node列表,返回一个将pod调度到某node的结果。

整个调度流程大致如下:

graph LR;
id1((Start))-->预选
预选-->优选
优选-->id2{是否调度到}
id2-->|是|id3
id2-->|否|抢占
抢占-->id3((结束))

源码阅读

本文基于14.3源码

入口文件是在 cmd/kube-scheduler/scheduler.go,加载配置逻辑是 cmd/kube-scheduler/app 文件夹下。整个调度过程逻辑是在 pkg/scheduler下。

源码阅读主要从以下几方面:启动前初始化、启动过程、调度过程(预选、优选、抢占)

启动前初始化过程

启动前初始化函数不少,重点关注pkg/scheduler/algorithmprovider/defaults包下的3个文件里的init()。

  1. default.go/init()方法主要注册了算法provider,目前有两个算法provider,分别是DefaultProviderClusterAutoscalerProvider。provider是一组算法的组合,包括预选、优选过程中的众多算法。此处注册过程只用到了算法的名字,到真正用到算法时会根据算法名取算法实现。
  2. register_predicates.go/init()方法主要注册多种预选算法,此处注册的方法通过名字与1中关联
  3. register_priorities.go/init()方法主要注册多种优选算法,此处注册的方法通过名字与1中关联

启动过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func main() {
rand.Seed(time.Now().UnixNano())

command := app.NewSchedulerCommand()

// TODO: once we switch everything over to Cobra commands, we can go back to calling
// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
// normalize func and add the go flag set by hand.
pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
// utilflag.InitFlags()
logs.InitLogs()
defer logs.FlushLogs()

if err := command.Execute(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}

首先从main函数(cmd/kube-scheduler/scheduler.go)看起:如上,可以看到main函数核心在 command.Execute,再找到command的类型是SchedulerCommand,然后追踪到该command的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func NewSchedulerCommand() *cobra.Command {
//...
cmd := &cobra.Command{
Use: "kube-scheduler",
Long: `...`,
Run: func(cmd *cobra.Command, args []string) {
if err := runCommand(cmd, args, opts); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
},
}
// ...
return cmd
}

重点关注 Run:func(cmd *cobra.Command,args []string),在这段代码里主要调用了 runCommand方法,再看这个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func runCommand(cmd *cobra.Command, args []string, opts *options.Options) error {
verflag.PrintAndExitIfRequested()
utilflag.PrintFlags(cmd.Flags())

//...
// 加载启动配置参数、选主(同一时间只有一个调度器真正在调度)
c, err := opts.Config()
stopCh := make(chan struct{})

// 根据配置的配置文件,complete配置
cc := c.Complete()
// 根据featureGate决定使用的算法
algorithmprovider.ApplyFeatureGates()

// Configz registration.
if cz, err := configz.New("componentconfig"); err == nil {
cz.Set(cc.ComponentConfig)
} else {
return fmt.Errorf("unable to register configz: %s", err)
}

return Run(cc, stopCh)
}

省略部分不重要代码,留下了核心代码并加了注释,应该很好懂了。最后执行Run方法,接下来看Run方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {

// 创建调度器,初始化调度器参数
sched, err := scheduler.New(cc.Client,
cc.InformerFactory.Core().V1().Nodes(),
cc.PodInformer,
cc.InformerFactory.Core().V1().PersistentVolumes(),
cc.InformerFactory.Core().V1().PersistentVolumeClaims(),
cc.InformerFactory.Core().V1().ReplicationControllers(),
cc.InformerFactory.Apps().V1().ReplicaSets(),
cc.InformerFactory.Apps().V1().StatefulSets(),
cc.InformerFactory.Core().V1().Services(),
cc.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
cc.InformerFactory.Storage().V1().StorageClasses(),
cc.Recorder,
cc.ComponentConfig.AlgorithmSource,
stopCh,
scheduler.WithName(cc.ComponentConfig.SchedulerName),
scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds))


// 启动所有 informers.
go cc.PodInformer.Informer().Run(stopCh)
cc.InformerFactory.Start(stopCh)

// Wait for all caches to sync before scheduling.
cc.InformerFactory.WaitForCacheSync(stopCh)
controller.WaitForCacheSync("scheduler", stopCh, cc.PodInformer.Informer().HasSynced)

// Prepare a reusable runCommand function.
run := func(ctx context.Context) {
sched.Run()
<-ctx.Done()
}

ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here
defer cancel()

// Leader election is disabled, so runCommand inline until done.
run(ctx)
return fmt.Errorf("finished without leader elect")
}

如上,主要初始化了调度器的参数,定义了一个可stop的方法run,最后执行run(ctx),查看run方法,里边的核心代码是 sched.Run(),此时代码进入到了 pkg/scheduler包里。继续追踪:

1
2
3
4
5
6
7
func (sched *Scheduler) Run() {
if !sched.config.WaitForCacheSync() {
return
}

go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}

最后执行的代码 go wait.Until…,实际上调用了sched.scheduleOne方法,并无限循环。

至此,启动过程结束,代码真正进到了调度处。

调度过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// scheduleOne does the entire scheduling workflow for a single pod.  It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne() {

// 从待调度队列中取出一个待调度的pod,此处无数据时会阻塞
pod := sched.config.NextPod()

// Synchronously attempt to find a fit for the pod.
start := time.Now()
// 执行调度方法,返回调度结果
scheduleResult, err := sched.schedule(pod)
// 如果调度出错,则可能进入抢占逻辑。
// 无论什么类型错误,进入if后都会return,结束本轮调度
if err != nil {
// 如果错误类型是 FitError
if fitError, ok := err.(*core.FitError); ok {
// 如果不允许抢占,则报错
if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." +
" No preemption is performed.")
} else {
// 否则,进入抢占
preemptionStartTime := time.Now()
sched.preempt(pod, fitError)
}

} else {
// 否则,报错
klog.Errorf("error selecting node for pod: %v", err)
metrics.PodScheduleErrors.Inc()
}
return
}

// ... 省略调度成功后绑定volume等操作
}

func (sched *Scheduler) schedule(pod *v1.Pod) (core.ScheduleResult, error) {
result, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister)
if err != nil {
pod = pod.DeepCopy()
sched.recordSchedulingFailure(pod, err, v1.PodReasonUnschedulable, err.Error())
return core.ScheduleResult{}, err
}
return result, err
}

主流程如上,先调度,再判断是否需要抢占,必要时进入抢占,调度成功则执行后续的收尾工作。再看一下调度方法(sched.config.Algorithm.Schedule):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (result ScheduleResult, err error) {
trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name))
defer trace.LogIfLong(100 * time.Millisecond)

if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
return result, err
}

nodes, err := nodeLister.List()
if err != nil {
return result, err
}
if len(nodes) == 0 {
return result, ErrNoNodesAvailable
}

if err := g.snapshot(); err != nil {
return result, err
}

trace.Step("Computing predicates")
startPredicateEvalTime := time.Now()

// 核心代码,预选过程,找到可运行pod的nodes
filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
if err != nil {
return result, err
}
// 没找到,直接return
if len(filteredNodes) == 0 {
return result, &FitError{
Pod: pod,
NumAllNodes: len(nodes),
FailedPredicates: failedPredicateMap,
}
}

trace.Step("Prioritizing")
startPriorityEvalTime := time.Now()
// When only one node after predicate, just use it.
if len(filteredNodes) == 1 {

return ScheduleResult{
SuggestedHost: filteredNodes[0].Name,
EvaluatedNodes: 1 + len(failedPredicateMap),
FeasibleNodes: 1,
}, nil
}

metaPrioritiesInterface := g.priorityMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap)
// 核心代码,优选过程,计算每个node的优先级得分
priorityList, err := PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
if err != nil {
return result, err
}

trace.Step("Selecting host")
// 找到优先级得分最大的node
host, err := g.selectHost(priorityList)
return ScheduleResult{
SuggestedHost: host,
EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap),
FeasibleNodes: len(filteredNodes),
}, err
}

补充

  1. pod新特性: spec.topologySpreadConstraint
  2. svc新特性: spec.topologyKeys

调度流程

node取样与整体流程

  1. 每个调度循环里,都会调用snapshot()方法来获取node快照,底层通过nodeTree结构实现对node进行分区,每次从中取node时轮流从分区中取数据,以此保证node节点足够分散。
  2. 在node节点足够分散的情况下,通过设置node取样规模,经过一层filter,取出指定可行数量的node(g.numFeasibleNodesToFind),小于100时直接返回所有,大于100而且取样比例小于100%时才会执行。取样比例在 kubeSchedulerConfiguration里配置
  3. 调用findNodesThatPassFilters方法,查找满足调度要求的节点。
  4. 调用findNodesThatPassExtenders执行用户自定义的调度程序。
  5. 计算得分
  6. selectHost
  7. assume -> 本地pod标记nodeName,node的pod列表把当前pod加进去,node移至nodeCache头,下次循环取snapshot时更新用。
  8. prebind extension point
  9. bind->extendersBinding->bindPlugin
  10. 如果上一步失败:defer里执行finishBinding会从node.PodList删除当前pod,取消其assume状态。然后recordSchedulingFailure-> sched.Error() -> MakeDefaultErrorFunc()() 把pod放到 sched.SchedulingQueue.unschedulableQ (定时任务处理数据:sched.Run()中sched.SchedulingQueue.Run())最后流转到activeQ重新调度
  11. postBindPlugin

调度框架

  1. QueueSort: 支持自定义排序
  2. PreFilter: 对Pod请求预处理
  3. Filter: Filter逻辑
  4. ExtenderFilter: 外部扩展filter
  5. PostFilter: logs、metrics等
  6. PreScore: score前预处理
  7. Score: 打分
  8. ExtenderScore: 外部扩展打分
  9. Reserve: 有状态plugin可对资源记账
  10. PreBind:
  11. Bind: (可执行extenderBind)
  12. Unreserve: 恢复操作,失败时取消记账内容
  13. PostBind

预选:Predicates(1.18版本中更改为filter, pkg/shceduler/framework/plugins)

存储相关

  1. nodeVolumeLimits
  2. volumeZone
  3. volumeRestrictions

节点相关

  1. nodeAffinity
  2. nodeLabel
  3. nodePorts
  4. nodePreferAvoidPods
  5. nodeResource
  6. nodeUnschedulable
  7. taintToleration

Pod间关系

  1. podTopologySpread
  2. interPodAffinity

打分:Priorities(1.18版本中更改为score, pkg/shceduler/framework/plugins)

解决问题: 碎片、容灾、水位、亲和性

node水位

  1. 优先打散: LeastAllocated
  2. 优先堆叠: MostAllocated
  3. 碎片率: BalancedAllocation
  4. 自定义比率: RequestedToCapacityRatio

pod间关系(topo,service,controller)

  1. Pod间topo: PodTopologySpread
  2. serivce: ServiceAffinity

    node亲和性

  3. nodeAffinity
  4. nodeLabel
  5. nodePreferAvoidPods
  6. ImageLocality
  7. taintToleration
  8. resourceLimits

    pod亲和性

  9. DefaultPodTopologySpread
  10. interPodAffinity

最后

本文仅讲解了调度器启动前的初始化过程以及调度整体流程,细节实现还需读者自行了解。