zoukankan      html  css  js  c++  java
  • Kubernetes源码阅读笔记——Scheduler(之二)

    上一篇文章主要侧重于Scheduler在正式执行调度任务之前的准备工作。下面,我们将分析Scheduler的核心方法之一:scheduleOne,来学习Scheduler的具体执行方式。

    一、scheduleOne

    pkg/scheduler/scheduler.go
    
    func (sched *Scheduler) scheduleOne() {
        ...
        pod := sched.config.NextPod()
        // pod could be nil when schedulerQueue is closed
        if pod == nil {
            return
        }
        if pod.DeletionTimestamp != nil {
            sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
            klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
            return
        }
    
        klog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)
    
        // Synchronously attempt to find a fit for the pod.
        start := time.Now()
        scheduleResult, err := sched.schedule(pod)
        if err != nil {
            if fitError, ok := err.(*core.FitError); ok {
                if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
                    klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." +
                        " No preemption is performed.")
                } else {
                    preemptionStartTime := time.Now()
                    sched.preempt(pod, fitError)
                    metrics.PreemptionAttempts.Inc()
                    metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(...)
                    metrics.SchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(...)
                }
                metrics.PodScheduleFailures.Inc()
            } else {
                klog.Errorf("error selecting node for pod: %v", err)
                metrics.PodScheduleErrors.Inc()
            }
            return
        }
        metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))
        assumedPod := pod.DeepCopy()
        allBound, err := sched.assumeVolumes(assumedPod, scheduleResult.SuggestedHost)
        if err != nil {
            klog.Errorf("error assuming volumes: %v", err)
            metrics.PodScheduleErrors.Inc()
            return
        }
    
        ...
    
        // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
        err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
        if err != nil {
            klog.Errorf("error assuming pod: %v", err)
            metrics.PodScheduleErrors.Inc()
            return
        }
        // bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
        go func() {
            // Bind volumes first before Pod
            if !allBound {
                err := sched.bindVolumes(assumedPod)
                if err != nil {
                    klog.Errorf("error binding volumes: %v", err)
                    metrics.PodScheduleErrors.Inc()
                    return
                }
            }
    
            ...
    
            err := sched.bind(assumedPod, &v1.Binding{
                ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
                Target: v1.ObjectReference{
                    Kind: "Node",
                    Name: scheduleResult.SuggestedHost,
                },
            })
            ...
        }()
    }

    执行过程分为以下几步:

    (1)从队列中取出下一个Pod。

    这一步通过调用sched.config.NextPod方法实现,从podQueue队列中取出下一个Pod。如果这个Pod正在删除,则跳过。

    (2)调度这个Pod。

    这一步通过调用sched.schedule方法实现。schedule方法比较简单,就是调用sche.config.Algorithm.Schedule方法,如果出现错误的话进行记录。

    还有一点需要注意:当schedule方法不成功,即pod不适合任何一个主机时,会触发抢占调度逻辑,即调用preempt方法(前提是scheduler的preemption功能开启了)。

    Schedule方法位于pkg/scheduler/core/generic_scheduler.go中,我们来详细看一下:

    pkg/scheduler/core/generic_scheduler.go
    
    func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (result ScheduleResult, err error) {
        trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name))
        defer trace.LogIfLong(100 * time.Millisecond)
    
        if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
            return result, err
        }
    
        nodes, err := nodeLister.List()
        if err != nil {
            return result, err
        }
        if len(nodes) == 0 {
            return result, ErrNoNodesAvailable
        }
    
        if err := g.snapshot(); err != nil {
            return result, err
        }
    
        trace.Step("Computing predicates")
        startPredicateEvalTime := time.Now()
        filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
        if err != nil {
            return result, err
        }
    
        if len(filteredNodes) == 0 {
            return result, &FitError{
                Pod:              pod,
                NumAllNodes:      len(nodes),
                FailedPredicates: failedPredicateMap,
            }
        }
        ...
    
        trace.Step("Prioritizing")
        startPriorityEvalTime := time.Now()
        // When only one node after predicate, just use it.
        if len(filteredNodes) == 1 {
            metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
            return ScheduleResult{
                SuggestedHost:  filteredNodes[0].Name,
                EvaluatedNodes: 1 + len(failedPredicateMap),
                FeasibleNodes:  1,
            }, nil
        }
    
        metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap)
        priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
        if err != nil {
            return result, err
        }
        ...
    
        trace.Step("Selecting host")
    
        host, err := g.selectHost(priorityList)
        return ScheduleResult{
            SuggestedHost:  host,
            EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap),
            FeasibleNodes:  len(filteredNodes),
        }, err
    }

    这里用到了一个trace工具,用于跟进调度的过程。

    调度的流程是:首先获取Node列表,通过调用findNodesThatFit方法过滤掉不符合要求的Node,然后通过调用PrioritizeNodes方法对Node进行优选打分,最后调用selectHost进行选择,并返回调度结果。

    关于findNodesThatFit、PrioritizeNodes和selectHost方法的具体实现,后面详细分析。

    (3)预绑定Pod到节点上。

    这一步通过调用sched.assumeVolumes方法预绑定Pod和Volume,然后运行相关插件,最后调用sched.assume方法预绑定Pod和主机。预绑定过程不涉及与API Server的交互,仅在scheduler的缓存中更新pod和volume的状态。

    (4)执行绑定。

    与前面的预绑定过程类似,正式的绑定也是先调用bingVolumes方法绑定Pod和Volume,然后运行插件,最后调用bind方法,实现正式绑定。正式绑定会与API Server交互,向API Server提交pod状态的更新。

    二、findNodesThatFit

    findNodesThatFit的作用是选出所有可运行Pod的节点。

    pkg/scheduler/core/generic_scheduler.go
    
    func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
        var filtered []*v1.Node
        failedPredicateMap := FailedPredicateMap{}
    
        if len(g.predicates) == 0 {
            filtered = nodes
        } else {
              allNodes := int32(g.cache.NodeTree().NumNodes())
              numNodesToFind := g.numFeasibleNodesToFind(allNodes)
            ...
    
            checkNode := func(i int) {
                nodeName := g.cache.NodeTree().Next()
                fits, failedPredicates, err := podFitsOnNode(
                    pod,
                    meta,
                    g.cachedNodeInfoMap[nodeName],
                    g.predicates,
                    g.schedulingQueue,
                    g.alwaysCheckAllPredicates,
                )
                if err != nil {
                    predicateResultLock.Lock()
                    errs[err.Error()]++
                    predicateResultLock.Unlock()
                    return
                }
                if fits {
                    length := atomic.AddInt32(&filteredLen, 1)
                    if length > numNodesToFind {
                        cancel()
                        atomic.AddInt32(&filteredLen, -1)
                    } else {
                        filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
                    }
                } else {
                    predicateResultLock.Lock()
                    failedPredicateMap[nodeName] = failedPredicates
                    predicateResultLock.Unlock()
                }
            }
    
            // Stops searching for more nodes once the configured number of feasible nodes
            // are found.
            workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
    
            filtered = filtered[:filteredLen]
            if len(errs) > 0 {
                return []*v1.Node{}, FailedPredicateMap{}, errors.CreateAggregateFromMessageCountMap(errs)
            }
        }
    
        if len(filtered) > 0 && len(g.extenders) != 0 {
            ...
        }
        return filtered, failedPredicateMap, nil
    }

    这个方法首先获取全部节点的数量,并基于这个数量调用numFeasibleNodesToFind方法,取得寻找合适节点的最大值。超过这个最大值后,调度器就不再继续寻找可调度的节点了。

    其次,方法定义了一个checkNode函数,采用多线程方式运行它(通过调用ParallelizeUntil方法,最多16个线程)。这个checkNode函数会调用podFitsOnNode函数,检测Pod可不可以调度到Node上,并将可调度的Node列表存储到filtered数组中。后面extenders是用来处理外部影响调度的情况的,略去。

    podFitsOnNode方法检测指定的pod能不能被调度到指定的node上,原理是依次用pkg/scheduler/algorithm/predicates/predicates.go里的predicatesOrdering列表中的元素对node进行预选检测。值得注意的是,podFitsOnNode运行了一个执行两次的循环,分别在pod调度与不调度的情况下进行两次预选检测。只有两次都通过检测后才算是预选成功。

    pkg/scheduler/algorithm/predicates/predicates.go包中定义了所有的检测方法,包括volume是否冲突,节点资源是否充足等。只有通过所有检测的节点,才会加入filtered数组中。这些检测方法通过pkg/scheduler/algorithmprovider/defaults/register_predicates.go中的init方法进行注册。

    三、PrioritizeNodes

    找到所有可调度节点后,就进入下一阶段,即优选阶段,通过调用PrioritizeNodes方法实现。

    pkg/scheduler/core/generic_scheduler.go
    
    func PrioritizeNodes(
        pod *v1.Pod,
        nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
        meta interface{},
        priorityConfigs []algorithm.PriorityConfig,
        nodes []*v1.Node,
        extenders []algorithm.SchedulerExtender,
    ) (schedulerapi.HostPriorityList, error) {
        // If no priority configs are provided, then the EqualPriority function is applied
        // This is required to generate the priority list in the required format
        if len(priorityConfigs) == 0 && len(extenders) == 0 {
            result := make(schedulerapi.HostPriorityList, 0, len(nodes))
            for i := range nodes {
                hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
                if err != nil {
                    return nil, err
                }
                result = append(result, hostPriority)
            }
            return result, nil
        }
    
        var (
            mu   = sync.Mutex{}
            wg   = sync.WaitGroup{}
            errs []error
        )
        appendError := func(err error) {
            mu.Lock()
            defer mu.Unlock()
            errs = append(errs, err)
        }
    
        results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))
    
            ...
            workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
                    nodeInfo := nodeNameToInfo[nodes[index].Name]
                    for i := range priorityConfigs {
                        if priorityConfigs[i].Function != nil {
                            continue
                       }
    
                          var err error
                          results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
                          if err != nil {
                              appendError(err)
                              results[i][index].Host = nodes[index].Name
                          }
                    }
             })
    
    
        for i := range priorityConfigs {
            if priorityConfigs[i].Reduce == nil {
                continue
            }
            wg.Add(1)
            go func(index int) {
                defer wg.Done()
                if err := priorityConfigs[index].Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
                    appendError(err)
                }
                if klog.V(10) {
                    for _, hostPriority := range results[index] {
                        klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Host, priorityConfigs[index].Name, hostPriority.Score)
                    }
                }
            }(i)
        }
        // Wait for all computations to be finished.
        wg.Wait()
        if len(errs) != 0 {
            return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
        }
    
        // Summarize all scores.
        result := make(schedulerapi.HostPriorityList, 0, len(nodes))
    
        for i := range nodes {
            result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
            for j := range priorityConfigs {
                result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
            }
        }
    
        if len(extenders) != 0 && nodes != nil {
            ...
        }
    
        if klog.V(10) {
            for i := range result {
                klog.Infof("Host %s => Score %d", result[i].Host, result[i].Score)
            }
        }
        return result, nil
    }

    第一步先判断有没有优先级的配置,如果没有则对所有节点赋予相同的权重,返回一个均等的node列表(即HostPriority的Score字段均设置为1)。

    第二步通过Map-Reduce方法多线程计算每个Node的分数。我们看到,Map-Reduce操作会遍历一个priorityConfigs数组,这个数组存放的就是PriorityConfig结构体的集合:

    pkg/scheduler/algorithm/types.go
    
    // PriorityConfig is a config used for a priority function.
    type PriorityConfig struct {
        Name   string
        Map    PriorityMapFunction
        Reduce PriorityReduceFunction
        // TODO: Remove it after migrating all functions to
        // Map-Reduce pattern.
        Function PriorityFunction
        Weight   int
    }

    而PriorityConfig正是存储优选算法相关信息的结构体。每个优选算法都会有自己的Name、Map(Map函数)、Reduce(Reduce函数)和Weight字段。所有的Map和Reduce函数都在pkg/scheduler/algorithm/priorities包中定义。这些方法通过pkg/scheduler/algorithmprovider/defaults/register_priorities.go中的init方法注册。

    第三步汇总打的分数,将每个算法计算出的节点分数乘以算法的权重,存入result中并返回。

    四、selectHost

    selectHost方法很直观,就是从前面打过分的节点列表中选出分数最高的节点,将其host返回。

    pkg/scheduler/core/generic_scheduler.go
    
    // selectHost takes a prioritized list of nodes and then picks one
    // in a round-robin manner from the nodes that had the highest score.
    func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList) (string, error) {
        if len(priorityList) == 0 {
            return "", fmt.Errorf("empty priorityList")
        }
    
        maxScores := findMaxScores(priorityList)
        ix := int(g.lastNodeIndex % uint64(len(maxScores)))
        g.lastNodeIndex++
    
        return priorityList[maxScores[ix]].Host, nil
    }

    五、总结

    总体而言,Scheduler的具体调度过程大致包含取出下一个Pod、选出目标Node、预绑定、实际绑定四个步骤。Scheduler并不实际运行Pod,它所做的只是向API-Server提交Pod调度后更新的信息(在sched.bind中实现)。

  • 相关阅读:
    mysql复制那点事
    全排列问题
    56. Merge Interval
    2. Add Two Numbers
    20. Valid Parentheses
    121. Best Time to Buy and Sell Stock
    120. Triangle
    96. Unique Binary Search Trees
    91. Decode Ways
    72. Edit Distance
  • 原文地址:https://www.cnblogs.com/00986014w/p/10320844.html
Copyright © 2011-2022 走看看