zoukankan      html  css  js  c++  java
  • Kubernetes源码阅读笔记——Scheduler(之二)

    上一篇文章主要侧重于Scheduler在正式执行调度任务之前的准备工作。下面,我们将分析Scheduler的核心方法之一:scheduleOne,来学习Scheduler的具体执行方式。

    一、scheduleOne

    pkg/scheduler/scheduler.go
    
    func (sched *Scheduler) scheduleOne() {
        ...
        pod := sched.config.NextPod()
        // pod could be nil when schedulerQueue is closed
        if pod == nil {
            return
        }
        if pod.DeletionTimestamp != nil {
            sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
            klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
            return
        }
    
        klog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)
    
        // Synchronously attempt to find a fit for the pod.
        start := time.Now()
        scheduleResult, err := sched.schedule(pod)
        if err != nil {
            if fitError, ok := err.(*core.FitError); ok {
                if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
                    klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." +
                        " No preemption is performed.")
                } else {
                    preemptionStartTime := time.Now()
                    sched.preempt(pod, fitError)
                    metrics.PreemptionAttempts.Inc()
                    metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(...)
                    metrics.SchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(...)
                }
                metrics.PodScheduleFailures.Inc()
            } else {
                klog.Errorf("error selecting node for pod: %v", err)
                metrics.PodScheduleErrors.Inc()
            }
            return
        }
        metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))
        assumedPod := pod.DeepCopy()
        allBound, err := sched.assumeVolumes(assumedPod, scheduleResult.SuggestedHost)
        if err != nil {
            klog.Errorf("error assuming volumes: %v", err)
            metrics.PodScheduleErrors.Inc()
            return
        }
    
        ...
    
        // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
        err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
        if err != nil {
            klog.Errorf("error assuming pod: %v", err)
            metrics.PodScheduleErrors.Inc()
            return
        }
        // bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
        go func() {
            // Bind volumes first before Pod
            if !allBound {
                err := sched.bindVolumes(assumedPod)
                if err != nil {
                    klog.Errorf("error binding volumes: %v", err)
                    metrics.PodScheduleErrors.Inc()
                    return
                }
            }
    
            ...
    
            err := sched.bind(assumedPod, &v1.Binding{
                ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
                Target: v1.ObjectReference{
                    Kind: "Node",
                    Name: scheduleResult.SuggestedHost,
                },
            })
            ...
        }()
    }

    执行过程分为以下几步:

    (1)从队列中取出下一个Pod。

    这一步通过调用sched.config.NextPod方法实现,从podQueue队列中取出下一个Pod。如果这个Pod正在删除,则跳过。

    (2)调度这个Pod。

    这一步通过调用sched.schedule方法实现。schedule方法比较简单,就是调用sche.config.Algorithm.Schedule方法,如果出现错误的话进行记录。

    还有一点需要注意:当schedule方法不成功,即pod不适合任何一个主机时,会触发抢占调度逻辑,即调用preempt方法(前提是scheduler的preemption功能开启了)。

    Schedule方法位于pkg/scheduler/core/generic_scheduler.go中,我们来详细看一下:

    pkg/scheduler/core/generic_scheduler.go
    
    func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (result ScheduleResult, err error) {
        trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name))
        defer trace.LogIfLong(100 * time.Millisecond)
    
        if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
            return result, err
        }
    
        nodes, err := nodeLister.List()
        if err != nil {
            return result, err
        }
        if len(nodes) == 0 {
            return result, ErrNoNodesAvailable
        }
    
        if err := g.snapshot(); err != nil {
            return result, err
        }
    
        trace.Step("Computing predicates")
        startPredicateEvalTime := time.Now()
        filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
        if err != nil {
            return result, err
        }
    
        if len(filteredNodes) == 0 {
            return result, &FitError{
                Pod:              pod,
                NumAllNodes:      len(nodes),
                FailedPredicates: failedPredicateMap,
            }
        }
        ...
    
        trace.Step("Prioritizing")
        startPriorityEvalTime := time.Now()
        // When only one node after predicate, just use it.
        if len(filteredNodes) == 1 {
            metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
            return ScheduleResult{
                SuggestedHost:  filteredNodes[0].Name,
                EvaluatedNodes: 1 + len(failedPredicateMap),
                FeasibleNodes:  1,
            }, nil
        }
    
        metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap)
        priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
        if err != nil {
            return result, err
        }
        ...
    
        trace.Step("Selecting host")
    
        host, err := g.selectHost(priorityList)
        return ScheduleResult{
            SuggestedHost:  host,
            EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap),
            FeasibleNodes:  len(filteredNodes),
        }, err
    }

    这里用到了一个trace工具,用于跟进调度的过程。

    调度的流程是:首先获取Node列表,通过调用findNodesThatFit方法过滤掉不符合要求的Node,然后通过调用PrioritizeNodes方法对Node进行优选打分,最后调用selectHost进行选择,并返回调度结果。

    关于findNodesThatFit、PrioritizeNodes和selectHost方法的具体实现,后面详细分析。

    (3)预绑定Pod到节点上。

    这一步通过调用sched.assumeVolumes方法预绑定Pod和Volume,然后运行相关插件,最后调用sched.assume方法预绑定Pod和主机。预绑定过程不涉及与API Server的交互,仅在scheduler的缓存中更新pod和volume的状态。

    (4)执行绑定。

    与前面的预绑定过程类似,正式的绑定也是先调用bingVolumes方法绑定Pod和Volume,然后运行插件,最后调用bind方法,实现正式绑定。正式绑定会与API Server交互,向API Server提交pod状态的更新。

    二、findNodesThatFit

    findNodesThatFit的作用是选出所有可运行Pod的节点。

    pkg/scheduler/core/generic_scheduler.go
    
    func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
        var filtered []*v1.Node
        failedPredicateMap := FailedPredicateMap{}
    
        if len(g.predicates) == 0 {
            filtered = nodes
        } else {
              allNodes := int32(g.cache.NodeTree().NumNodes())
              numNodesToFind := g.numFeasibleNodesToFind(allNodes)
            ...
    
            checkNode := func(i int) {
                nodeName := g.cache.NodeTree().Next()
                fits, failedPredicates, err := podFitsOnNode(
                    pod,
                    meta,
                    g.cachedNodeInfoMap[nodeName],
                    g.predicates,
                    g.schedulingQueue,
                    g.alwaysCheckAllPredicates,
                )
                if err != nil {
                    predicateResultLock.Lock()
                    errs[err.Error()]++
                    predicateResultLock.Unlock()
                    return
                }
                if fits {
                    length := atomic.AddInt32(&filteredLen, 1)
                    if length > numNodesToFind {
                        cancel()
                        atomic.AddInt32(&filteredLen, -1)
                    } else {
                        filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
                    }
                } else {
                    predicateResultLock.Lock()
                    failedPredicateMap[nodeName] = failedPredicates
                    predicateResultLock.Unlock()
                }
            }
    
            // Stops searching for more nodes once the configured number of feasible nodes
            // are found.
            workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
    
            filtered = filtered[:filteredLen]
            if len(errs) > 0 {
                return []*v1.Node{}, FailedPredicateMap{}, errors.CreateAggregateFromMessageCountMap(errs)
            }
        }
    
        if len(filtered) > 0 && len(g.extenders) != 0 {
            ...
        }
        return filtered, failedPredicateMap, nil
    }

    这个方法首先获取全部节点的数量,并基于这个数量调用numFeasibleNodesToFind方法,取得寻找合适节点的最大值。超过这个最大值后,调度器就不再继续寻找可调度的节点了。

    其次,方法定义了一个checkNode函数,采用多线程方式运行它(通过调用ParallelizeUntil方法,最多16个线程)。这个checkNode函数会调用podFitsOnNode函数,检测Pod可不可以调度到Node上,并将可调度的Node列表存储到filtered数组中。后面extenders是用来处理外部影响调度的情况的,略去。

    podFitsOnNode方法检测指定的pod能不能被调度到指定的node上,原理是依次用pkg/scheduler/algorithm/predicates/predicates.go里的predicatesOrdering列表中的元素对node进行预选检测。值得注意的是,podFitsOnNode运行了一个执行两次的循环,分别在pod调度与不调度的情况下进行两次预选检测。只有两次都通过检测后才算是预选成功。

    pkg/scheduler/algorithm/predicates/predicates.go包中定义了所有的检测方法,包括volume是否冲突,节点资源是否充足等。只有通过所有检测的节点,才会加入filtered数组中。这些检测方法通过pkg/scheduler/algorithmprovider/defaults/register_predicates.go中的init方法进行注册。

    三、PrioritizeNodes

    找到所有可调度节点后,就进入下一阶段,即优选阶段,通过调用PrioritizeNodes方法实现。

    pkg/scheduler/core/generic_scheduler.go
    
    func PrioritizeNodes(
        pod *v1.Pod,
        nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
        meta interface{},
        priorityConfigs []algorithm.PriorityConfig,
        nodes []*v1.Node,
        extenders []algorithm.SchedulerExtender,
    ) (schedulerapi.HostPriorityList, error) {
        // If no priority configs are provided, then the EqualPriority function is applied
        // This is required to generate the priority list in the required format
        if len(priorityConfigs) == 0 && len(extenders) == 0 {
            result := make(schedulerapi.HostPriorityList, 0, len(nodes))
            for i := range nodes {
                hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
                if err != nil {
                    return nil, err
                }
                result = append(result, hostPriority)
            }
            return result, nil
        }
    
        var (
            mu   = sync.Mutex{}
            wg   = sync.WaitGroup{}
            errs []error
        )
        appendError := func(err error) {
            mu.Lock()
            defer mu.Unlock()
            errs = append(errs, err)
        }
    
        results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))
    
            ...
            workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
                    nodeInfo := nodeNameToInfo[nodes[index].Name]
                    for i := range priorityConfigs {
                        if priorityConfigs[i].Function != nil {
                            continue
                       }
    
                          var err error
                          results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
                          if err != nil {
                              appendError(err)
                              results[i][index].Host = nodes[index].Name
                          }
                    }
             })
    
    
        for i := range priorityConfigs {
            if priorityConfigs[i].Reduce == nil {
                continue
            }
            wg.Add(1)
            go func(index int) {
                defer wg.Done()
                if err := priorityConfigs[index].Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
                    appendError(err)
                }
                if klog.V(10) {
                    for _, hostPriority := range results[index] {
                        klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Host, priorityConfigs[index].Name, hostPriority.Score)
                    }
                }
            }(i)
        }
        // Wait for all computations to be finished.
        wg.Wait()
        if len(errs) != 0 {
            return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
        }
    
        // Summarize all scores.
        result := make(schedulerapi.HostPriorityList, 0, len(nodes))
    
        for i := range nodes {
            result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
            for j := range priorityConfigs {
                result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
            }
        }
    
        if len(extenders) != 0 && nodes != nil {
            ...
        }
    
        if klog.V(10) {
            for i := range result {
                klog.Infof("Host %s => Score %d", result[i].Host, result[i].Score)
            }
        }
        return result, nil
    }

    第一步先判断有没有优先级的配置,如果没有则对所有节点赋予相同的权重,返回一个均等的node列表(即HostPriority的Score字段均设置为1)。

    第二步通过Map-Reduce方法多线程计算每个Node的分数。我们看到,Map-Reduce操作会遍历一个priorityConfigs数组,这个数组存放的就是PriorityConfig结构体的集合:

    pkg/scheduler/algorithm/types.go
    
    // PriorityConfig is a config used for a priority function.
    type PriorityConfig struct {
        Name   string
        Map    PriorityMapFunction
        Reduce PriorityReduceFunction
        // TODO: Remove it after migrating all functions to
        // Map-Reduce pattern.
        Function PriorityFunction
        Weight   int
    }

    而PriorityConfig正是存储优选算法相关信息的结构体。每个优选算法都会有自己的Name、Map(Map函数)、Reduce(Reduce函数)和Weight字段。所有的Map和Reduce函数都在pkg/scheduler/algorithm/priorities包中定义。这些方法通过pkg/scheduler/algorithmprovider/defaults/register_priorities.go中的init方法注册。

    第三步汇总打的分数,将每个算法计算出的节点分数乘以算法的权重,存入result中并返回。

    四、selectHost

    selectHost方法很直观,就是从前面打过分的节点列表中选出分数最高的节点,将其host返回。

    pkg/scheduler/core/generic_scheduler.go
    
    // selectHost takes a prioritized list of nodes and then picks one
    // in a round-robin manner from the nodes that had the highest score.
    func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList) (string, error) {
        if len(priorityList) == 0 {
            return "", fmt.Errorf("empty priorityList")
        }
    
        maxScores := findMaxScores(priorityList)
        ix := int(g.lastNodeIndex % uint64(len(maxScores)))
        g.lastNodeIndex++
    
        return priorityList[maxScores[ix]].Host, nil
    }

    五、总结

    总体而言,Scheduler的具体调度过程大致包含取出下一个Pod、选出目标Node、预绑定、实际绑定四个步骤。Scheduler并不实际运行Pod,它所做的只是向API-Server提交Pod调度后更新的信息(在sched.bind中实现)。

  • 相关阅读:
    读Android之大话设计模式--前言和说明
    把二叉树打印成多行
    按之字形顺序打印二叉树
    对称的二叉树
    二叉树的下一个结点
    链表中环的入口结点
    字符流中第一个不重复的字符
    表示数值的字符串
    构建乘积数组
    数组中重复的数字
  • 原文地址:https://www.cnblogs.com/00986014w/p/10320844.html
Copyright © 2011-2022 走看看