cmd\kube-scheduler\scheduler.go
func main() {
// 新构建一个scheduler运行命令
command := app.NewSchedulerCommand() --->cmd\kube-scheduler\app\server.go
code := cli.Run(command)
os.Exit(code)
}
cmd\kube-scheduler\app\server.go
NewSchedulerCommand
cmd := &cobra.Command{
RunE: func(cmd *cobra.Command, args []string) error {
return runCommand(cmd, opts, registryOptions...)
},
Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args {
if len(arg) > 0 {
return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
}
}
return nil
},
}
runCommand
cc, sched, err := Setup(ctx, opts, registryOptions...)
...
return Run(ctx, cc, sched)
Setup
cc := c.Complete()
...
sched, err := scheduler.New(cc.Client,... --->pkg\scheduler\scheduler.go
Run
waitingForLeader := make(chan struct{})
isLeader := func() bool {
select {
case _, ok := <-waitingForLeader:
// if channel is closed, we are leading
return !ok
default:
// channel is open, we are waiting for a leader
return false
}
}
...
if cc.LeaderElection != nil {
// 选举的回调函数
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
// 选举成功则会关闭等待信道,并由自己启动sched,与上面isLeader函数第一段对应
close(waitingForLeader)
sched.Run(ctx) --->pkg\scheduler\scheduler.go这里就跳转到scheduler具体逻辑里开始正常工作了,sched是Setup函数中初始化的
},
OnStoppedLeading: func() {
// 选举失败则正常结束退出了
select {
case <-ctx.Done():
// We were asked to terminate. Exit 0.
klog.InfoS("Requested to terminate, exiting")
os.Exit(0)
default:
// We lost the lock.
klog.Exitf("leaderelection lost")
}
},
}
leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
if err != nil {
return fmt.Errorf("couldn't create leader elector: %v", err)
}
// 发起一次新选举过程
leaderElector.Run(ctx) ---->vendor\k8s.io\client-go\tools\leaderelection\leaderelection.go
return fmt.Errorf("lost lease")
}
pkg\scheduler\scheduler.go
// 注释已经很清楚了,运行并监听调度器,并且一直持续到上下文停止,也就是出现情况才会gg,否则就一直跑
// Run begins watching and scheduling. It starts scheduling and blocked until the context is done.
func (sched *Scheduler) Run(ctx context.Context) {
sched.SchedulingQueue.Run()
wait.UntilWithContext(ctx, sched.scheduleOne, 0)
sched.SchedulingQueue.Close()
}
// 为每个pod执行整个调度工作流
scheduleOne
podInfo := sched.NextPod()
// 1.获取pod信息并检验
if podInfo == nil || podInfo.Pod == nil {
return
}
// 2.获取pod绑定的调度器
fwk, err := sched.frameworkForPod(pod)
// 3.是否跳过调度,这里感觉23顺序可以调下
if sched.skipPodSchedule(fwk, pod) {
return
}
...
// 4.使用算法进行调度,得出最终目标节点
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, sched.Extenders, fwk, state, pod) --->pkg\scheduler\generic_scheduler.go
// 5.目标节点与pod绑定 --->就是把assumed.Spec.NodeName置为了该主机名
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
// 6.运行预留的一些插件
sts := fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
// 7.运行钩子插件
runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
// 8.更新pod队列
if len(podsToActivate.Map) != 0 {
sched.SchedulingQueue.Activate(podsToActivate.Map)
podsToActivate.Map = make(map[string]*v1.Pod)
}
// 9.运行prebind插件阶段
preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
// 10.运行bind时的拓展插件及本身插件
err := sched.bind(bindingCycleCtx, fwk, assumedPod, scheduleResult.SuggestedHost, state)
// 11.运行绑定结束后插件
fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
// pod.Spec.SchedulerName对应的就是我们定义yaml文件中的spec下schedulerName参数,但是一般使用情况不会去定义,所以是默认的default-scheduler,所以其实我们可以为不同pod定义不同的调度器达到更适合的效果
func (sched *Scheduler) frameworkForPod(pod *v1.Pod) (framework.Framework, error) {
fwk, ok := sched.Profiles[pod.Spec.SchedulerName]
if !ok {
return nil, fmt.Errorf("profile not found for scheduler name %q", pod.Spec.SchedulerName)
}
return fwk, nil
}
func (sched *Scheduler) skipPodSchedule(fwk framework.Framework, pod *v1.Pod) bool {
// 情况1:pod被标记要删除
if pod.DeletionTimestamp != nil {
return true
}
// 情况2:pod在被调度前信息又被变更了,则可以将它再次扔会待调度队列,并跳过此次调度
isAssumed, err := sched.SchedulerCache.IsAssumedPod(pod)
if err != nil {
return false
}
return isAssumed
}
pkg\scheduler\generic_scheduler.go
**Schedule**
// 预选
feasibleNodes, diagnosis, err := g.findNodesThatFitPod(ctx, extenders, fwk, state, pod)
if len(feasibleNodes) == 0 {
无可用节点,返回对应错误
}
if len(feasibleNodes) == 1 {
仅有一个的时候就不用优选了,直接返回它
}
...
// 优选
priorityList, err := prioritizeNodes(ctx, extenders, fwk, state, pod, feasibleNodes)
...
// 选择目标地调度节点
host, err := g.selectHost(priorityList)
// 返回host
// 预选阶段
**findNodesThatFitPod**
// prefilter阶段
s := fwk.RunPreFilterPlugins(ctx, state, pod) --->pkg\scheduler\framework\runtime\framework.go
...
// 在有指定节点选择的情况下,直接计算符合条件的节点并处理结果
if len(pod.Status.NominatedNodeName) > 0 && feature.DefaultFeatureGate.Enabled(features.PreferNominatedNode) {
feasibleNodes, err := g.evaluateNominatedNode(ctx, extenders, pod, fwk, state, diagnosis)
if err != nil {
// 指定node都无法满足则gg
klog.ErrorS(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName)
}
if len(feasibleNodes) != 0 {
// 有能满足的直接返回不用找全部节点了
return feasibleNodes, diagnosis, nil
}
}
...
// 将所有节点放入预选插件
feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, allNodes)
...
// 使用扩展插件再次过滤
feasibleNodes, err = findNodesThatPassExtenders(extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
// 返回feasibleNodes
// 指定节点的评估流程方法
evaluateNominatedNode
nnn := pod.Status.NominatedNodeName
nodeInfo, err := g.nodeInfoSnapshot.Get(nnn)
node := []*framework.NodeInfo{nodeInfo}
// 跟主filter流程一样,不同之处仅在于计算的目标是符合指定nodename的节点
feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, node)
feasibleNodes, err = findNodesThatPassExtenders(extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
**findNodesThatPassFilters**
// 确定本轮要进行过滤的节点的个数,这里并不是笼统的直接计算当前集群的所有节点,但是每次调度计算的节点数量最多100个
numNodesToFind := g.numFeasibleNodesToFind(int32(len(nodes)))
...
if !fwk.HasFilterPlugins() {
// 这里面的逻辑我们可以看出scheduler是有一个nextStartNodeIndex标记位的,我们每次调度都会更新这个调度位,下次调度会从该标志位开始向后找,这是因为在大规模集群中节点数量超过了100,但是每次最多选举100个节点进行调度,如果没有标志位那就会一直循环前100个节点,这是不公平也是错误的
length := len(nodes)
for i := range feasibleNodes {
feasibleNodes[i] = nodes[(g.nextStartNodeIndex+i)%length].Node()
}
g.nextStartNodeIndex = (g.nextStartNodeIndex + len(feasibleNodes)) % length
return feasibleNodes, nil
}
...
checkNode := func(i int) {
...
//
if length > numNodesToFind {
// 个数够了就直接取消上下文了停止计算了
cancel()
// 使用atomic操作是因为该计算是所有节点一起进行的
atomic.AddInt32(&feasibleNodesLen, -1)
} else {
feasibleNodes[length-1] = nodeInfo.Node()
}
...
}
// checkNode函数对于每个节点是并行的
fwk.Parallelizer().Until(ctx, len(nodes), checkNode)
// 更新标志位
processedNodes := int(feasibleNodesLen) + len(diagnosis.NodeToStatusMap)
g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(nodes)
**numFeasibleNodesToFind**
func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {
// minFeasibleNodesToFind是常量100,默认percentageOfNodesToScore为DefaultPercentageOfNodesToScore是0。因此常规情况下,只有集群节点数在100个以内时,才会计算所有节点
if numAllNodes < minFeasibleNodesToFind || g.percentageOfNodesToScore >= 100 {
return numAllNodes
}
// 下面这段代码的总义就是随着集群内节点变多,那么选取的百分比会逐渐变低,最低到5%
adaptivePercentage := g.percentageOfNodesToScore
if adaptivePercentage <= 0 {
// 由于percentageOfNodesToScore默认为0,所以basePercentageOfNodesToScore默认为50
basePercentageOfNodesToScore := int32(50)
// 当节点数量过百后,adaptivePercentage = 50 - (总节点数量)/125
adaptivePercentage = basePercentageOfNodesToScore - numAllNodes/125
// minFeasibleNodesPercentageToFind默认是5
if adaptivePercentage < minFeasibleNodesPercentageToFind {
adaptivePercentage = minFeasibleNodesPercentageToFind
}
}
// 按刚才计算出的百分比选取当前个数的节点进行计算应计算的节点数量
numNodes = numAllNodes * adaptivePercentage / 100
if numNodes < minFeasibleNodesToFind {
// 但是最少100个
return minFeasibleNodesToFind
}
return numNodes
}
// 优选阶段
**prioritizeNodes**
// prescore阶段
preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes)
// 根据插件为各节点打分
scoresMap, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes)
// 如果日志等级到10,你可以看到每个插件为每个节点打分的情况
if klog.V(10).Enabled() {
for plugin, nodeScoreList := range scoresMap {
for _, nodeScore := range nodeScoreList {
klog.InfoS("Plugin scored node for pod", "pod", klog.KObj(pod), "plugin", plugin, "node", nodeScore.Name, "score", nodeScore.Score)
}
}
}
// 循环scoresMap并取每个节点对应分值最终加和为该节点最终得分
for i := range nodes {
result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0})
for j := range scoresMap {
result[i].Score += scoresMap[j][i].Score
}
}
// 如果有拓展插件则也会运行继续打分
if len(extenders) != 0 && nodes != nil {
...
for i := range extenders {
...
// 拓展插件按权重打分
prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
}
}
// selectHost就是找出了优选节点中的最高分节点
func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (string, error) {
maxScore := nodeScoreList[0].Score
selected := nodeScoreList[0].Name
cntOfMaxScore := 1
for _, ns := range nodeScoreList[1:] {
if ns.Score > maxScore {
maxScore = ns.Score
selected = ns.Name
cntOfMaxScore = 1
} else if ns.Score == maxScore {
cntOfMaxScore++
if rand.Intn(cntOfMaxScore) == 0 {
selected = ns.Name
}
}
}
return selected, nil
}