上一篇文章主要侧重于Scheduler在正式执行调度任务之前的准备工作。下面,我们将分析Scheduler的核心方法之一:scheduleOne,来学习Scheduler的具体执行方式。
一、scheduleOne
pkg/scheduler/scheduler.go func (sched *Scheduler) scheduleOne() { ... pod := sched.config.NextPod() // pod could be nil when schedulerQueue is closed if pod == nil { return } if pod.DeletionTimestamp != nil { sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name) return } klog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name) // Synchronously attempt to find a fit for the pod. start := time.Now() scheduleResult, err := sched.schedule(pod) if err != nil { if fitError, ok := err.(*core.FitError); ok { if !util.PodPriorityEnabled() || sched.config.DisablePreemption { klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." + " No preemption is performed.") } else { preemptionStartTime := time.Now() sched.preempt(pod, fitError) metrics.PreemptionAttempts.Inc() metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(...) metrics.SchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(...) } metrics.PodScheduleFailures.Inc() } else { klog.Errorf("error selecting node for pod: %v", err) metrics.PodScheduleErrors.Inc() } return } metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start)) assumedPod := pod.DeepCopy() allBound, err := sched.assumeVolumes(assumedPod, scheduleResult.SuggestedHost) if err != nil { klog.Errorf("error assuming volumes: %v", err) metrics.PodScheduleErrors.Inc() return } ... // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost err = sched.assume(assumedPod, scheduleResult.SuggestedHost) if err != nil { klog.Errorf("error assuming pod: %v", err) metrics.PodScheduleErrors.Inc() return } // bind the pod to its host asynchronously (we can do this b/c of the assumption step above). go func() { // Bind volumes first before Pod if !allBound { err := sched.bindVolumes(assumedPod) if err != nil { klog.Errorf("error binding volumes: %v", err) metrics.PodScheduleErrors.Inc() return } } ... err := sched.bind(assumedPod, &v1.Binding{ ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID}, Target: v1.ObjectReference{ Kind: "Node", Name: scheduleResult.SuggestedHost, }, }) ... }() }
执行过程分为以下几步:
(1)从队列中取出下一个Pod。
这一步通过调用sched.config.NextPod方法实现,从podQueue队列中取出下一个Pod。如果这个Pod正在删除,则跳过。
(2)调度这个Pod。
这一步通过调用sched.schedule方法实现。schedule方法比较简单,就是调用sche.config.Algorithm.Schedule方法,如果出现错误的话进行记录。
还有一点需要注意:当schedule方法不成功,即pod不适合任何一个主机时,会触发抢占调度逻辑,即调用preempt方法(前提是scheduler的preemption功能开启了)。
Schedule方法位于pkg/scheduler/core/generic_scheduler.go中,我们来详细看一下:
pkg/scheduler/core/generic_scheduler.go func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (result ScheduleResult, err error) { trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name)) defer trace.LogIfLong(100 * time.Millisecond) if err := podPassesBasicChecks(pod, g.pvcLister); err != nil { return result, err } nodes, err := nodeLister.List() if err != nil { return result, err } if len(nodes) == 0 { return result, ErrNoNodesAvailable } if err := g.snapshot(); err != nil { return result, err } trace.Step("Computing predicates") startPredicateEvalTime := time.Now() filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes) if err != nil { return result, err } if len(filteredNodes) == 0 { return result, &FitError{ Pod: pod, NumAllNodes: len(nodes), FailedPredicates: failedPredicateMap, } } ... trace.Step("Prioritizing") startPriorityEvalTime := time.Now() // When only one node after predicate, just use it. if len(filteredNodes) == 1 { metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime)) return ScheduleResult{ SuggestedHost: filteredNodes[0].Name, EvaluatedNodes: 1 + len(failedPredicateMap), FeasibleNodes: 1, }, nil } metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap) priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders) if err != nil { return result, err } ... trace.Step("Selecting host") host, err := g.selectHost(priorityList) return ScheduleResult{ SuggestedHost: host, EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap), FeasibleNodes: len(filteredNodes), }, err }
这里用到了一个trace工具,用于跟进调度的过程。
调度的流程是:首先获取Node列表,通过调用findNodesThatFit方法过滤掉不符合要求的Node,然后通过调用PrioritizeNodes方法对Node进行优选打分,最后调用selectHost进行选择,并返回调度结果。
关于findNodesThatFit、PrioritizeNodes和selectHost方法的具体实现,后面详细分析。
(3)预绑定Pod到节点上。
这一步通过调用sched.assumeVolumes方法预绑定Pod和Volume,然后运行相关插件,最后调用sched.assume方法预绑定Pod和主机。预绑定过程不涉及与API Server的交互,仅在scheduler的缓存中更新pod和volume的状态。
(4)执行绑定。
与前面的预绑定过程类似,正式的绑定也是先调用bingVolumes方法绑定Pod和Volume,然后运行插件,最后调用bind方法,实现正式绑定。正式绑定会与API Server交互,向API Server提交pod状态的更新。
二、findNodesThatFit
findNodesThatFit的作用是选出所有可运行Pod的节点。
pkg/scheduler/core/generic_scheduler.go func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) { var filtered []*v1.Node failedPredicateMap := FailedPredicateMap{} if len(g.predicates) == 0 { filtered = nodes } else { allNodes := int32(g.cache.NodeTree().NumNodes()) numNodesToFind := g.numFeasibleNodesToFind(allNodes) ... checkNode := func(i int) { nodeName := g.cache.NodeTree().Next() fits, failedPredicates, err := podFitsOnNode( pod, meta, g.cachedNodeInfoMap[nodeName], g.predicates, g.schedulingQueue, g.alwaysCheckAllPredicates, ) if err != nil { predicateResultLock.Lock() errs[err.Error()]++ predicateResultLock.Unlock() return } if fits { length := atomic.AddInt32(&filteredLen, 1) if length > numNodesToFind { cancel() atomic.AddInt32(&filteredLen, -1) } else { filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node() } } else { predicateResultLock.Lock() failedPredicateMap[nodeName] = failedPredicates predicateResultLock.Unlock() } } // Stops searching for more nodes once the configured number of feasible nodes // are found. workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode) filtered = filtered[:filteredLen] if len(errs) > 0 { return []*v1.Node{}, FailedPredicateMap{}, errors.CreateAggregateFromMessageCountMap(errs) } } if len(filtered) > 0 && len(g.extenders) != 0 { ... } return filtered, failedPredicateMap, nil }
这个方法首先获取全部节点的数量,并基于这个数量调用numFeasibleNodesToFind方法,取得寻找合适节点的最大值。超过这个最大值后,调度器就不再继续寻找可调度的节点了。
其次,方法定义了一个checkNode函数,采用多线程方式运行它(通过调用ParallelizeUntil方法,最多16个线程)。这个checkNode函数会调用podFitsOnNode函数,检测Pod可不可以调度到Node上,并将可调度的Node列表存储到filtered数组中。后面extenders是用来处理外部影响调度的情况的,略去。
podFitsOnNode方法检测指定的pod能不能被调度到指定的node上,原理是依次用pkg/scheduler/algorithm/predicates/predicates.go里的predicatesOrdering列表中的元素对node进行预选检测。值得注意的是,podFitsOnNode运行了一个执行两次的循环,分别在pod调度与不调度的情况下进行两次预选检测。只有两次都通过检测后才算是预选成功。
pkg/scheduler/algorithm/predicates/predicates.go包中定义了所有的检测方法,包括volume是否冲突,节点资源是否充足等。只有通过所有检测的节点,才会加入filtered数组中。这些检测方法通过pkg/scheduler/algorithmprovider/defaults/register_predicates.go中的init方法进行注册。
三、PrioritizeNodes
找到所有可调度节点后,就进入下一阶段,即优选阶段,通过调用PrioritizeNodes方法实现。
pkg/scheduler/core/generic_scheduler.go func PrioritizeNodes( pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, meta interface{}, priorityConfigs []algorithm.PriorityConfig, nodes []*v1.Node, extenders []algorithm.SchedulerExtender, ) (schedulerapi.HostPriorityList, error) { // If no priority configs are provided, then the EqualPriority function is applied // This is required to generate the priority list in the required format if len(priorityConfigs) == 0 && len(extenders) == 0 { result := make(schedulerapi.HostPriorityList, 0, len(nodes)) for i := range nodes { hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name]) if err != nil { return nil, err } result = append(result, hostPriority) } return result, nil } var ( mu = sync.Mutex{} wg = sync.WaitGroup{} errs []error ) appendError := func(err error) { mu.Lock() defer mu.Unlock() errs = append(errs, err) } results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs)) ... workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) { nodeInfo := nodeNameToInfo[nodes[index].Name] for i := range priorityConfigs { if priorityConfigs[i].Function != nil { continue } var err error results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo) if err != nil { appendError(err) results[i][index].Host = nodes[index].Name } } }) for i := range priorityConfigs { if priorityConfigs[i].Reduce == nil { continue } wg.Add(1) go func(index int) { defer wg.Done() if err := priorityConfigs[index].Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil { appendError(err) } if klog.V(10) { for _, hostPriority := range results[index] { klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Host, priorityConfigs[index].Name, hostPriority.Score) } } }(i) } // Wait for all computations to be finished. wg.Wait() if len(errs) != 0 { return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs) } // Summarize all scores. result := make(schedulerapi.HostPriorityList, 0, len(nodes)) for i := range nodes { result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0}) for j := range priorityConfigs { result[i].Score += results[j][i].Score * priorityConfigs[j].Weight } } if len(extenders) != 0 && nodes != nil { ... } if klog.V(10) { for i := range result { klog.Infof("Host %s => Score %d", result[i].Host, result[i].Score) } } return result, nil }
第一步先判断有没有优先级的配置,如果没有则对所有节点赋予相同的权重,返回一个均等的node列表(即HostPriority的Score字段均设置为1)。
第二步通过Map-Reduce方法多线程计算每个Node的分数。我们看到,Map-Reduce操作会遍历一个priorityConfigs数组,这个数组存放的就是PriorityConfig结构体的集合:
pkg/scheduler/algorithm/types.go // PriorityConfig is a config used for a priority function. type PriorityConfig struct { Name string Map PriorityMapFunction Reduce PriorityReduceFunction // TODO: Remove it after migrating all functions to // Map-Reduce pattern. Function PriorityFunction Weight int }
而PriorityConfig正是存储优选算法相关信息的结构体。每个优选算法都会有自己的Name、Map(Map函数)、Reduce(Reduce函数)和Weight字段。所有的Map和Reduce函数都在pkg/scheduler/algorithm/priorities包中定义。这些方法通过pkg/scheduler/algorithmprovider/defaults/register_priorities.go中的init方法注册。
第三步汇总打的分数,将每个算法计算出的节点分数乘以算法的权重,存入result中并返回。
四、selectHost
selectHost方法很直观,就是从前面打过分的节点列表中选出分数最高的节点,将其host返回。
pkg/scheduler/core/generic_scheduler.go // selectHost takes a prioritized list of nodes and then picks one // in a round-robin manner from the nodes that had the highest score. func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList) (string, error) { if len(priorityList) == 0 { return "", fmt.Errorf("empty priorityList") } maxScores := findMaxScores(priorityList) ix := int(g.lastNodeIndex % uint64(len(maxScores))) g.lastNodeIndex++ return priorityList[maxScores[ix]].Host, nil }
五、总结
总体而言,Scheduler的具体调度过程大致包含取出下一个Pod、选出目标Node、预绑定、实际绑定四个步骤。Scheduler并不实际运行Pod,它所做的只是向API-Server提交Pod调度后更新的信息(在sched.bind中实现)。