zoukankan      html  css  js  c++  java
  • 《k8s-1.13版本源码分析》-调度优选

    源码分析系列文章已经开源到github,地址如下:

    • github:https://github.com/farmer-hutao/k8s-source-code-analysis
    • gitbook:https://farmer-hutao.github.io/k8s-source-code-analysis

    本文大纲

    • 走近priority过程
    • PrioritizeNodes整体流程
      • Results
      • Old Priority Function
      • Map-Reduce
      • Combine Scores
    • Fun和Map-Reduce实例分析
      • InterPodAffinityPriority(Function)
      • CalculateNodeAffinityPriorityMap(Map)
      • CalculateNodeAffinityPriorityReduce(Reduce)
      • 小结

    1. 走近priority过程

    pkg/scheduler/core/generic_scheduler.go:186

    priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
    

    今天的分析从这行代码开始。

    PrioritizeNodes要做的事情是给已经通过predicate的nodes赋上一个分值,从而抉出一个最优node用于运行当前pod. 第一次看priority可能会一脸蒙,和predicate中的逻辑不太一样;大伙得耐下性子多思考,实在有障碍也可以先不求甚解,整体过完后再二刷代码,再不行三刷,总会大彻大悟的!

    从注释中可以找到关于PrioritizeNodes的原理(pkg/scheduler/core/generic_scheduler.go:624):

    • PrioritizeNodes通过并发调用一个个priority函数来给node排优先级。每一个priority函数会给一个1-10之间的分值,0最低10最高。
    • 每一个priority函数可以有自己的权重,单个函数返回的分值*权重后得到一个加权分值,最终所有的加权分值加在一起就是这个node的最终分值。

    然后我们先函数签名入手:

    pkg/scheduler/core/generic_scheduler.go:624

    func PrioritizeNodes(
        pod *v1.Pod,
        nodeNameToInfo map[string]*schedulercache.NodeInfo,
        meta interface{},
        priorityConfigs []algorithm.PriorityConfig,
        nodes []*v1.Node,
        extenders []algorithm.SchedulerExtender,
    ) (schedulerapi.HostPriorityList, error)
    

    形参定义和返回值:

    • pod *v1.Pod* // pod就不用说了;
    • *nodeNameToInfo map[string]*schedulercache.NodeInfo // 这个也不需要讲,字面意思代表一切;
    • meta interface{} // 和predicate里的meta不太一样,下面会贴个debug的图先,具体后面再看;
    • priorityConfigs []algorithm.PriorityConfig // 包含优选算法各种信息,比较重要;
    • nodes []*v1.Node // node集合,不需要解释了;
    • extenders []algorithm.SchedulerExtender // extender逻辑放到后面单独讲。

    meta实参长这个样子:

    返回值只需要看一下schedulerapi.HostPriorityList类型的含义了,这个类型之前也提过,后面频繁涉及到操作这个结构,所以这里再贴一次,大伙得烂熟于心才行!

    pkg/scheduler/api/types.go:305

    type HostPriority struct {
        Host string
        Score int
    }
    type HostPriorityList []HostPriority
    

    着重分析一下这2个type,虽然很简单,还是有必要啰嗦一下,必须记在心里。HostPriority这个struct的属性是HostScore,一个是string一个是int,所以很明显HostPriority所能够保存的信息是一个节点的名字和分值,再仔细一点说就是这个结构保存的是一个node在一个priority算法计算后所得到的结果;然后看HostPriorityList类型,这个类型是上一个类型的“集合”,集合表达的是一个node多个算法还是多个node一个算法呢?稍微思考一下可以知道HostPriorityList中存的是多个Host和Score的组合,所以HostPriorityList这个结构是要保存一个算法作用于所有node之后,得到的所有node的Score信息的。(这里我们先理解成一个算法的结果,作为函数返回值这里肯定是要保留所有算法作用后的最终node的Score,所以函数后半部分肯定有combine分值的步骤。)

    2. PrioritizeNodes整体流程

    前面说到PrioritizeNodes()函数也就是node优选的具体逻辑,这个函数略长,我们分段讲解。

    2.1. Results

    PrioritizeNodes()函数开头的逻辑很简单,我们先从第一行看到results定义的这一行。

    pkg/scheduler/core/generic_scheduler.go:634

    if len(priorityConfigs) == 0 && len(extenders) == 0 {
        // 这个if很明显是处理特殊场景的,就是优选算法一个都没有配置(extenders同样没有配置)的时候怎么做;
        // 这个result是要当作返回值的,HostPriorityList类型前面唠叨了很多了,大家得心里有数;
       result := make(schedulerapi.HostPriorityList, 0, len(nodes))
       for i := range nodes {
           // 这一行代码是唯一的“逻辑了”,下面直到for结束都是简单代码;所以我们看一下EqualPriorityMap
           // 函数的作用就行了。这里我不贴代码,这个函数很短,作用就是设置每个node的Score相同(都为1)
           // hostPriority的类型也就是schedulerapi.HostPriority类型,再次强调这个类型是要烂熟于心的;
          hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
          if err != nil {
             return nil, err
          }
           // 最终的result也就是设置了每个node的Score为1的schedulerapi.HostPriorityList类型数据;
          result = append(result, hostPriority)
       }
       return result, nil
    }
    // 这里只是简单定义3个变量,一把锁,一个并发等待相关的wg,一个错误集合errs;
    var (
       mu   = sync.Mutex{}
       wg   = sync.WaitGroup{}
       errs []error
    )
    // 这里定义了一个appendError小函数,逻辑很简单,并发场景下将错误信息收集到errs中;
    appendError := func(err error) {
       mu.Lock()
       defer mu.Unlock()
       errs = append(errs, err)
    }
    // 最后一个变量results也不难理解,类型是[]schedulerapi.HostPriorityList,这里需要注意这个类型
    // 的作用,它保存的是所有算法作用所有node之后得到的结果集,相当于一个二维数组,每个格子是1个算法
    // 作用于1个节点的结果,一行也就是1个算法作用于所有节点的结果;一行展成一个二维就是所有算法作用于所有节点;
    results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))
    

    到这里要求大家心中能够想象上面提到的results是什么样的,可以借助纸笔画一画。下面的代码会往这个二维结构里面存储数据。

    2.2. Old Priority Function

    我们既然讲到“老式”,后面肯定有对应的“新式”。虽然这种函数已经DEPRECATED了,不过对于我们学习掌握优选流程还是很有帮助的。默认的优选算法里其实也只有1个是这在old形式的了:

    贴这块代码之前我们先关注一下多次出现的priorityConfigs这个变量的类型:

    函数形参中有写到:priorityConfigs []algorithm.PriorityConfig,所以我们直接看PriorityConfig是什么类型:

    pkg/scheduler/algorithm/types.go:62

    // PriorityConfig is a config used for a priority function.
    type PriorityConfig struct {
       Name   string
       Map    PriorityMapFunction
       Reduce PriorityReduceFunction
       // TODO: Remove it after migrating all functions to
       // Map-Reduce pattern.
       Function PriorityFunction
       Weight   int
    }
    

    PriorityConfig中有一个Name,一个Weight,很好猜到意思,名字和权重嘛。剩下的Map、Reduce和Function目测代表的就是优选函数的新旧两种表达方式了。我们先看旧的Function属性的类型PriorityFunction是什么:

    pkg/scheduler/algorithm/types.go:59

    type PriorityFunction func(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error)
    

    很明显这个类型代表了一个priority函数,入参是pod、nodeNameToInfo和nodes,返回值是HostPriorityList,也就是我们前面提到的1个priority函数作用于每个node后得到了Score信息,存结果的结构就是这个HostPriorityList;看起来很和谐~

    然后讲回PrioritizeNodes过程:

    pkg/scheduler/core/generic_scheduler.go:661

    for i := range priorityConfigs {
        // 如果第i个优选配置(priorityConfig)定义了老函数,则调用之;
        if priorityConfigs[i].Function != nil {
            wg.Add(1)
            // 注意这里的参数index,这里传入的实参是上面的i;
            go func(index int) {
                defer wg.Done()
                var err error
                // 所以这里的results[index]就好理解了;后面priorityConfigs[index]的索引也是index,
                // 这里表达的是第N个优选配置里有Function,那么这个Function的计算结果保存在
                // results的第N个格子里;
                results[index], err = priorityConfigs[index].Function(pod, nodeNameToInfo, nodes)
                if err != nil {
                    appendError(err)
                }
            }(i)
        } else {
            // 如果没有定义Function,其实也就是使用了Map-Reduce方式的,这里先存个空的结构占位;
            results[i] = make(schedulerapi.HostPriorityList, len(nodes))
        }
    }
    

    上面这段代码逻辑还算好理解,唯一有点小绕的还是前面强调的HostPriorityList相关类型的操作上。

    2.3. Map-Reduce

    关于map-reduce思想我就不在这里赘述了,大数据行业很流行的一个词汇,百度一下(如果你能够google,自然更好咯)可以找到一大堆介绍的文章。

    简单说map-reduce就是:Map是映射,Reduce是归约;map是统计一本书中的一页出现了多少次k8s这个词,reduce是将这些map结果汇总在一起得到最终结果。(map一般都是将一个算法作用于一堆数据集的每一个元素,得到一个结果集,reduce有各种形式,可以是累加这些结果,或者是对这个结果集做其他复杂的f(x)操作。

    看看在Scheduler里面是怎么用Map-Reduce的吧:

    // 这个并发逻辑之前介绍过了,我们直接看ParallelizeUntil的最后一个参数就行,这里直接写了一个匿名函数;
    workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
        // 这里的index是[0,len(nodes)-1],相当于遍历所有的nodes;
       nodeInfo := nodeNameToInfo[nodes[index].Name]
        // 这个for循环遍历的是所有的优选配置,如果有老Fun就跳过,新逻辑就继续;
       for i := range priorityConfigs {
          if priorityConfigs[i].Function != nil {
              // 因为前面old已经运行过了
             continue
          }
    
          var err error
           // 这里的i和前面老Fun的互补,老Fun中没有赋值的results中的元素就在这里赋值了;
           // 注意到这里调用了一个Map函数就直接赋值给了results[i][index],这里的index是第一行这个
           // 匿名函数的形参,通过ParallelizeUntil这个并发实现所有node对应一个优选算法的分值计算;
          results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
          if err != nil {
             appendError(err)
             results[i][index].Host = nodes[index].Name
          }
       }
    })
    
    for i := range priorityConfigs {
        // 没有定义Reduce函数就不处理;
       if priorityConfigs[i].Reduce == nil {
          continue
       }
       wg.Add(1)
       go func(index int) {
          defer wg.Done()
           // 调用Reduce函数
          if err := priorityConfigs[index].Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
             appendError(err)
          }
          if klog.V(10) {
             for _, hostPriority := range results[index] {
                klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Host, priorityConfigs[index].Name, hostPriority.Score)
             }
          }
       }(i)
    }
    // Wait for all computations to be finished.
    wg.Wait()
    if len(errs) != 0 {
       return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
    }
    

    看到这里我们可以发现老Fun和Map的区别不大,都是优选函数的执行过程。那为什么会存在两种形式呢?我们看完PrioritizeNodes整体流程后通过具体的Fun和Map-Reduce实现来看二者的区别。

    2.4. Combine Scores

    这块的代码很简单,我们先抛开extender的逻辑,剩下的代码如下:

    // Summarize all scores.
    // 这个result和前面的results类似,result用于存储每个node的Score,到这里已经没有必要区分算法了;
    result := make(schedulerapi.HostPriorityList, 0, len(nodes))
    // 循环执行len(nodes)次
    for i := range nodes {
        // 先在result中塞满所有node的Name,Score初始化为0;
       result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
        // 执行了多少个priorityConfig就有多少个Score,所以这里遍历len(priorityConfigs)次;
       for j := range priorityConfigs {
           // 每个算法对应第i个node的结果分值加权后累加;
          result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
       }
    }
    
    return result, nil
    

    这块逻辑很清晰,要将前面得到的二维结果results压缩成一维的加权分值集合result,最终返回这个result.

    从这里我们还可以得到一个结论,不管是Fun还是Map-Reduce,处理的结果都是填充results这个二维结构,所以Map-Reduce也没有什么神秘的,下面通过具体的算法来看二者有何异同。

    3. Fun和Map-Reduce实例分析

    3.1. InterPodAffinityPriority(Function)

    这个算法做的是Pod间亲和性优选,也就是亲和pod越多的节点分值越高,反亲和pod越多分值越低。

    我们撇开具体的亲和性计算规则,从优选函数的形式上看一下这段代码的逻辑:

    pkg/scheduler/algorithm/priorities/interpod_affinity.go:119

    func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) {
        affinity := pod.Spec.Affinity
        // 是否有亲和性约束;
        hasAffinityConstraints := affinity != nil && affinity.PodAffinity != nil
        // 是否有反亲和性约束;
        hasAntiAffinityConstraints := affinity != nil && affinity.PodAntiAffinity != nil
    
        // 这里有一段根据亲和性和反亲和性来计算一个node上匹配的pod数量的逻辑,我们先跳过这些逻辑,从优选算法实现的角度看这个算法的架子;
    
        // 当遍历完所有的node之后,可以得到1个最高分和1个最低分,分别记为maxCount和minCount;
        for _, node := range nodes {
            if pm.counts[node.Name] > maxCount {
                maxCount = pm.counts[node.Name]
            }
            if pm.counts[node.Name] < minCount {
                minCount = pm.counts[node.Name]
            }
        }
        // 这个result类型和前面看到的一样,都是存储单个算法的计算结果的;
        result := make(schedulerapi.HostPriorityList, 0, len(nodes))
        for _, node := range nodes {
            fScore := float64(0)
            // 如果分差大于0,也就是说不是所有的node都一样的情况,需要对分值做一个处理;
            if (maxCount - minCount) > 0 {
                // MaxPriority定义的是优选最高分10,第二个因数是当前node的count-最小count,
                // 然后除以(maxCount - minCount);举个例子,当前node的计算结果是5,最大count是20,最小
                // count是-3,那么这里就是10*[5-(-3)/20-(-3)]
                // 这个计算的结果显然会在[0-10]之间;
                fScore = float64(schedulerapi.MaxPriority) * ((pm.counts[node.Name] - minCount) / (maxCount - minCount))
            }
            // 如果分差不大于0,这时候int(fScore)也就是0,对于各个node的结果都是0;
            result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)})
        }
        return result, nil
    }
    

    如上,我们可以发现最终这个函数计算出了每个node的分值,这个分值在[0-10]之间。所以说到底Function做的事情就是根据一定的规则给每个node赋一个分值,这个分值要求在[0-10]之间,然后把这个HostPriorityList返回就行。

    3.2. CalculateNodeAffinityPriorityMap(Map)

    这个算法和上一个类似,上一个是Pod的Affinity,这个是Node的Affinity;我们来看代码:

    pkg/scheduler/algorithm/priorities/node_affinity.go:34

    func CalculateNodeAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
        node := nodeInfo.Node()
        if node == nil {
            return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
        }
    
        // default is the podspec.
        affinity := pod.Spec.Affinity
        if priorityMeta, ok := meta.(*priorityMetadata); ok {
            // We were able to parse metadata, use affinity from there.
            affinity = priorityMeta.affinity
        }
    
        var count int32
        if affinity != nil && affinity.NodeAffinity != nil && affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil {
            // Match PreferredDuringSchedulingIgnoredDuringExecution term by term.
            for i := range affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
                preferredSchedulingTerm := &affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution[i]
                if preferredSchedulingTerm.Weight == 0 {
                    continue
                }
    
                nodeSelector, err := v1helper.NodeSelectorRequirementsAsSelector(preferredSchedulingTerm.Preference.MatchExpressions)
                if err != nil {
                    return schedulerapi.HostPriority{}, err
                }
                if nodeSelector.Matches(labels.Set(node.Labels)) {
                    count += preferredSchedulingTerm.Weight
                }
            }
        }
    
        return schedulerapi.HostPriority{
            Host:  node.Name,
            Score: int(count),
        }, nil
    }
    

    撇开具体的亲和性计算细节,我们可以发现这个的count没有特定的规则,可能会加到10以上;另外这里的返回值是HostPriority类型,前面的Function返回了HostPriorityList类型。

    map函数

    pkg/scheduler/algorithm/priorities/selector_spreading.go:221

    func (s *ServiceAntiAffinity) CalculateAntiAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
        var firstServiceSelector labels.Selector
    
        node := nodeInfo.Node()
        if node == nil {
            return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
        }
        priorityMeta, ok := meta.(*priorityMetadata)
        if ok {
            firstServiceSelector = priorityMeta.podFirstServiceSelector
        } else {
            firstServiceSelector = getFirstServiceSelector(pod, s.serviceLister)
        }
        // 查找给定node在给定namespace下符合selector的pod,返回值是[]*v1.Pod
        matchedPodsOfNode := filteredPod(pod.Namespace, firstServiceSelector, nodeInfo)
    
        return schedulerapi.HostPriority{
            Host:  node.Name,
            // 返回值中Score设置成上面找到的pod的数量
            Score: int(len(matchedPodsOfNode)),
        }, nil
    }
    

    这个函数比较短,可以看到在指定node上查询到匹配selector的pod越多,分值就越高。假设找到了20个,那么这里的分值就是20;假设找到的是2,那这里的分值就是2.

    3.3. CalculateNodeAffinityPriorityReduce(Reduce)

    和上面这个Map对应的Reduce函数其实没有单独实现,通过NormalizeReduce函数做了一个通用的Reduce处理:

    pkg/scheduler/algorithm/priorities/node_affinity.go:77

    var CalculateNodeAffinityPriorityReduce = NormalizeReduce(schedulerapi.MaxPriority, false)
    

    pkg/scheduler/algorithm/priorities/reduce.go:29

    func NormalizeReduce(maxPriority int, reverse bool) algorithm.PriorityReduceFunction {
        return func(
            _ *v1.Pod,
            _ interface{},
            _ map[string]*schedulercache.NodeInfo,
            // 注意到这个result是HostPriorityList,对应1个算法N个node的结果集
            result schedulerapi.HostPriorityList) error {
    
            var maxCount int
            // 遍历result将最高的Score赋值给maxCount;
            for i := range result {
                if result[i].Score > maxCount {
                    maxCount = result[i].Score
                }
            }
    
            if maxCount == 0 {
                if reverse {
                    for i := range result {
                        result[i].Score = maxPriority
                    }
                }
                return nil
            }
    
            for i := range result {
                score := result[i].Score
                // 举个例子:10*(5/20)
                score = maxPriority * score / maxCount
                if reverse {
                    // 如果score是3,得到7;如果score是4,得到6,结果反转;
                    score = maxPriority - score
                }
    
                result[i].Score = score
            }
            return nil
        }
    }
    

    3.4. 小结

    • Function:一个算法一次性计算出所有node的Score,这个Score的范围是规定的[0-10];
    • Map-Reduce:一个Map算法计算1个node的Score,这个Score可以灵活处理,可能是20,可能是-3;Map过程并发进行;最终得到的结果result通过Reduce归约,将这个算法对应的所有node的分值归约为[0-10];

    本节有几张图是goland debug的截图,我们目前还没有提到如何debug;不过本节内容的阅读基本是不影响的。下一节源码分析内容发出来前我会在“环境准备”这一章中增加如何开始debug的内容,大家可以选择开始debug的时机。

  • 相关阅读:
    UIButton(在代码中使用)
    UIButton的常见设置
    设置按钮的背景图片
    Java Web学习总结(15)——JSP指令
    Java Web学习总结(14)——JSP基础语法
    Java Web学习总结(14)——JSP基础语法
    Oracle学习总结(9)—— Oracle 常用的基本操作
    Oracle学习总结(9)—— Oracle 常用的基本操作
    Oracle学习总结(10)——45 个非常有用的 Oracle 查询语句
    Oracle学习总结(10)——45 个非常有用的 Oracle 查询语句
  • 原文地址:https://www.cnblogs.com/cloudgeek/p/10561221.html
Copyright © 2011-2022 走看看