zoukankan      html  css  js  c++  java
  • Kubernetes源码阅读笔记——Scheduler(之一)

    Scheduler是集群中Master节点的重要组件,其功能是根据集群中各Pod的资源需求、亲和性等指标,将Pod合理调度到Kubernetes集群中的各个节点上。

    一、入口函数

    入口函数与Controller Manager的入口函数结构相同,同样是应用了cobra包,在命令行中注册了kube-scheduler命令。

    cmd/kube-scheduler/scheduler.go

    func main() { rand.Seed(time.Now().UnixNano()) command := app.NewSchedulerCommand() pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc) logs.InitLogs() defer logs.FlushLogs() if err := command.Execute(); err != nil { fmt.Fprintf(os.Stderr, "%v ", err) os.Exit(1) } }

    这里核心的方法仍然是NewSchedulerCommand。该方法位于app/server.go中,结构与Controller Manager几乎一样,因此不贴上来了。核心的部分仍然是在cobra.Command结构体的Run字段中调用runCommand方法。

    runCommand方法为Scheduler配置Config,最终返回的是Run方法,将Scheduler运行起来。

    runCommand方法中间有一行值得注意:

    func runCommand(cmd *cobra.Command, args []string, opts *options.Options) error {
    
        ...
    
        algorithmprovider.ApplyFeatureGates()
    
        ...
        return Run(cc, stopCh)
    }

    这一行的作用是调用ApplyFeatureGates方法,并根据Feature Gate的配置,注册或者删除相应的预选策略。

    进入ApplyFeatureGates方法,发现方法就一行,而整个包就这一个方法:

    pkg/scheduler/algorithmprovider/plugin.go
    
    package algorithmprovider
    
    import  "k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults"
    
    // ApplyFeatureGates applies algorithm by feature gates.
    func ApplyFeatureGates() {
        defaults.ApplyFeatureGates()
    }

    事实上,在pkg/scheduler/algorirhmprovider/defaults/defaults.go中,有一个init方法:

    pkg/scheduler/algorirhmprovider/defaults/defaults.go

    func init() { registerAlgorithmProvider(defaultPredicates(), defaultPriorities()) }

    因此,在导入defaults包时,就已经执行了registerAlgorithmProvider方法,对一些预选与优选方法进行了注册。再配合ApplyFeatureGates方法,根据k8s中一些feature的开启情况,增加或删除一些预选和优选方法。这些feature的位置在pkg/features/kube_features.go中。

    详细的预选和优选方法的定义位于pkg/scheduler/algorithm和pkg/scheduler/algorithmprovider中,这里不详细展开。

    二、Run

    看一下Run方法:

    func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
        // Create the scheduler.
        sched, err := scheduler.New(cc.Client,
            cc.InformerFactory.Core().V1().Nodes(),
            cc.PodInformer,
            cc.InformerFactory.Core().V1().PersistentVolumes(),
            cc.InformerFactory.Core().V1().PersistentVolumeClaims(),
            cc.InformerFactory.Core().V1().ReplicationControllers(),
            cc.InformerFactory.Apps().V1().ReplicaSets(),
            cc.InformerFactory.Apps().V1().StatefulSets(),
            cc.InformerFactory.Core().V1().Services(),
            cc.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(),
            cc.InformerFactory.Storage().V1().StorageClasses(),
            cc.Recorder,
            cc.ComponentConfig.AlgorithmSource,
            stopCh,
            scheduler.WithName(cc.ComponentConfig.SchedulerName),
            scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight),
            scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
            scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
            scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds))
        if err != nil {
            return err
        }
    
        // Prepare the event broadcaster.
        ...
    
        // Setup healthz checks.
        ...// Start all informers.
        go cc.PodInformer.Informer().Run(stopCh)
        cc.InformerFactory.Start(stopCh)
    
        // Wait for all caches to sync before scheduling.
        cc.InformerFactory.WaitForCacheSync(stopCh)
        controller.WaitForCacheSync("scheduler", stopCh, cc.PodInformer.Informer().HasSynced)
    
        // Prepare a reusable runCommand function.
        run := func(ctx context.Context) {
            sched.Run()
            <-ctx.Done()
        }
    
        ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here
        defer cancel()
    
        go func() {
            select {
            case <-stopCh:
                cancel()
            case <-ctx.Done():
            }
        }()
    
        // If leader election is enabled, runCommand via LeaderElector until done and exit.
        ...
    
        // Leader election is disabled, so runCommand inline until done.
        run(ctx)
        return fmt.Errorf("finished without leader elect")
    }

    Run方法主要包含下面几件事:

    (1)创建Scheduler。

    Run方法的前几行代码调用了New方法,创建了一个Scheduler对象。这个New方法位于pkg/scheduler/scheduler.go中:

    pkg/scheduler/scheduler.go
    
    func New(client clientset.Interface,
        nodeInformer coreinformers.NodeInformer,
        podInformer coreinformers.PodInformer,
        pvInformer coreinformers.PersistentVolumeInformer,
        pvcInformer coreinformers.PersistentVolumeClaimInformer,
        replicationControllerInformer coreinformers.ReplicationControllerInformer,
        replicaSetInformer appsinformers.ReplicaSetInformer,
        statefulSetInformer appsinformers.StatefulSetInformer,
        serviceInformer coreinformers.ServiceInformer,
        pdbInformer policyinformers.PodDisruptionBudgetInformer,
        storageClassInformer storageinformers.StorageClassInformer,
        recorder record.EventRecorder,
        schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource,
        stopCh <-chan struct{},
        opts ...func(o *schedulerOptions)) (*Scheduler, error) {
    
        options := defaultSchedulerOptions
        for _, opt := range opts {
            opt(&options)
        }
    
        // Set up the configurator which can create schedulers from configs.
        configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
            SchedulerName:                  options.schedulerName,
            Client:                         client,
            NodeInformer:                   nodeInformer,
            PodInformer:                    podInformer,
            PvInformer:                     pvInformer,
            PvcInformer:                    pvcInformer,
            ReplicationControllerInformer:  replicationControllerInformer,
            ReplicaSetInformer:             replicaSetInformer,
            StatefulSetInformer:            statefulSetInformer,
            ServiceInformer:                serviceInformer,
            PdbInformer:                    pdbInformer,
            StorageClassInformer:           storageClassInformer,
            HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight,
            DisablePreemption:              options.disablePreemption,
            PercentageOfNodesToScore:       options.percentageOfNodesToScore,
            BindTimeoutSeconds:             options.bindTimeoutSeconds,
        })
        var config *factory.Config
        source := schedulerAlgorithmSource
        switch {
        case source.Provider != nil:
            // Create the config from a named algorithm provider.
            ...
        case source.Policy != nil:
            // Create the config from a user specified policy source.
            ...
        default:
            return nil, fmt.Errorf("unsupported algorithm source: %v", source)
        }
        // Additional tweaks to the config produced by the configurator.
        config.Recorder = recorder
        config.DisablePreemption = options.disablePreemption
        config.StopEverything = stopCh
        // Create the scheduler.
        sched := NewFromConfig(config)
        return sched, nil
    }

    New方法逻辑相对清晰,其本质就是根据传入的Informer、算法等参数,实例化一个Config,然后调用NewFromConfig方法,通过这个Config创建一个scheduler实例并返回。可以看到,scheduler中也用到了包括nodeInformer、podInformer等在内的大量Informer,因为scheduler也需要及时掌握资源的变化,从而调整调度的策略。

    中间switch一段代码会判断config的调度算法源是用户自定义的还是给定的provider。如果使用默认的provider,则会将前面注册过的预选、优选方法加载进来。

    创建config的NewConfigFactory方法位于pkg/scheduler/factory/factory.go中,进入方法:

    pkg/scheduler/factory/factory.go
    
    // NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only
    // return the interface.
    func NewConfigFactory(args *ConfigFactoryArgs) Configurator {
       stopEverything := args.StopCh
       if stopEverything == nil {
          stopEverything = wait.NeverStop
       }
       schedulerCache := schedulerinternalcache.New(30*time.Second, stopEverything)
    
       // storageClassInformer is only enabled through VolumeScheduling feature gate
       var storageClassLister storagelisters.StorageClassLister
       if args.StorageClassInformer != nil {
          storageClassLister = args.StorageClassInformer.Lister()
       }
       c := &configFactory{
          client:                         args.Client,
          podLister:                      schedulerCache,
          podQueue:                       internalqueue.NewSchedulingQueue(stopEverything),
          nodeLister:                     args.NodeInformer.Lister(),
          pVLister:                       args.PvInformer.Lister(),
          pVCLister:                      args.PvcInformer.Lister(),
          serviceLister:                  args.ServiceInformer.Lister(),
          controllerLister:               args.ReplicationControllerInformer.Lister(),
          replicaSetLister:               args.ReplicaSetInformer.Lister(),
          statefulSetLister:              args.StatefulSetInformer.Lister(),
          pdbLister:                      args.PdbInformer.Lister(),
          storageClassLister:             storageClassLister,
          schedulerCache:                 schedulerCache,
          StopEverything:                 stopEverything,
          schedulerName:                  args.SchedulerName,
          hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight,
          disablePreemption:              args.DisablePreemption,
          percentageOfNodesToScore:       args.PercentageOfNodesToScore,
       }
    
       c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced
       // scheduled pod cache
       args.PodInformer.Informer().AddEventHandler(
          cache.FilteringResourceEventHandler{
             FilterFunc: func(obj interface{}) bool {
                switch t := obj.(type) {
                case *v1.Pod:
                   return assignedPod(t)
                case cache.DeletedFinalStateUnknown:
                   if pod, ok := t.Obj.(*v1.Pod); ok {
                      return assignedPod(pod)
                   }
                   runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
                   return false
                default:
                   runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
                   return false
                }
             },
             Handler: cache.ResourceEventHandlerFuncs{
                AddFunc:    c.addPodToCache,
                UpdateFunc: c.updatePodInCache,
                DeleteFunc: c.deletePodFromCache,
             },
          },
       )
       // unscheduled pod queue
       args.PodInformer.Informer().AddEventHandler(
          cache.FilteringResourceEventHandler{
             FilterFunc: func(obj interface{}) bool {
                switch t := obj.(type) {
                case *v1.Pod:
                   return !assignedPod(t) && responsibleForPod(t, args.SchedulerName)
                case cache.DeletedFinalStateUnknown:
                   if pod, ok := t.Obj.(*v1.Pod); ok {
                      return !assignedPod(pod) && responsibleForPod(pod, args.SchedulerName)
                   }
                   runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c))
                   return false
                default:
                   runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
                   return false
                }
             },
             Handler: cache.ResourceEventHandlerFuncs{
                AddFunc:    c.addPodToSchedulingQueue,
                UpdateFunc: c.updatePodInSchedulingQueue,
                DeleteFunc: c.deletePodFromSchedulingQueue,
             },
          },
       )
       // ScheduledPodLister is something we provide to plug-in functions that
       // they may need to call.
       c.scheduledPodLister = assignedPodLister{args.PodInformer.Lister()}
    
       args.NodeInformer.Informer().AddEventHandler(
          cache.ResourceEventHandlerFuncs{
             AddFunc:    c.addNodeToCache,
             UpdateFunc: c.updateNodeInCache,
             DeleteFunc: c.deleteNodeFromCache,
          },
       )
    
       args.PvInformer.Informer().AddEventHandler(
          cache.ResourceEventHandlerFuncs{
             // MaxPDVolumeCountPredicate: since it relies on the counts of PV.
             AddFunc:    c.onPvAdd,
             UpdateFunc: c.onPvUpdate,
          },
       )
    
       // This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound.
       args.PvcInformer.Informer().AddEventHandler(
          cache.ResourceEventHandlerFuncs{
             AddFunc:    c.onPvcAdd,
             UpdateFunc: c.onPvcUpdate,
          },
       )
    
       // This is for ServiceAffinity: affected by the selector of the service is updated.
       args.ServiceInformer.Informer().AddEventHandler(
          cache.ResourceEventHandlerFuncs{
             AddFunc:    c.onServiceAdd,
             UpdateFunc: c.onServiceUpdate,
             DeleteFunc: c.onServiceDelete,
          },
       )
    
       // Setup volume binder
       c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.NodeInformer, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second)
    
       args.StorageClassInformer.Informer().AddEventHandler(
          cache.ResourceEventHandlerFuncs{
             AddFunc: c.onStorageClassAdd,
          },
       )
    
       // Setup cache debugger
       ...
    
       go func() {
          <-c.StopEverything
          c.podQueue.Close()
       }()
    
       return c
    }

    该方法为一系列Informer初始化了回调函数。其中最重要的是PodInformer的两个回调函数。

    可以看到,方法调用了两次AddEventHandler方法,都经过了过滤。第一次只处理已调度的Pod,第二次只处理未调度的Pod,并定义了对两种Pod的增、改、删方法,分别在缓存和队列中对这两种Pod进行更新。这样,就将已调度和未调度的Pod区分开。

    后面为其他informer添加的回调函数,除了NodeInformer的回调函数会在缓存中更新node信息,其他回调函数最终都会调用MoveAllToActiveQueue方法,将待调度的Pod添加进队列。

    此外,可以看到,在ConfigFactory中,有一个podQueue字段,维护了一个队列,用于存放待调度的Pod。

    (2)运行广播和健康检查。

    中间有几行是为Scheduler配置广播和健康检查相关内容,与Controller Manager类似,不提。

    (3)Informer启动。

    值得注意的是,Scheduler将PodInformer从其他的Informer中独立出来,因为对Pod的调度才是Scheduler的核心。

    (4)运行Scheduler。

    这是整个方法的核心。通过调用Scheduler的Run方法,将Scheduler运行起来。

    进入Run方法,我们发现方法非常简洁,就做了2件事:

    pkg/scheduler/scheduler.go
    
    func (sched *Scheduler) Run() {
    	if !sched.config.WaitForCacheSync() {
    		return
    	}
    
    	go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
    }

    第一件事是等待各informer的缓存同步,第二件事是调用scheduleOne方法,执行Pod的调度操作。wait.Until的作用是每隔一段时间执行一次sched.scheduleOne方法,除非sched.config.StopEverything被关闭。这里时间段被设置为0,所以scheduleOne方法会一个接一个不停地被调用。

    scheduleOne方法的具体逻辑我们下一篇文章再继续分析。https://www.cnblogs.com/00986014w/p/10320844.html

    三、总结

    总结Scheduler的逻辑,大体上是通过cobra注册一个kube-scheduler命令并运行。命令运行时,首先应用给定的调度算法,然后基于ConfigFactory,创建一个Scheduler的实例,启动相关的Informer,然后开始执行调度。

  • 相关阅读:
    django template extends
    python sys.path的用法
    django form 显示
    django form 验证
    WingIDE 下载,介绍和配置
    python startswith
    django reverse()
    python 动态创建类
    django form 定义
    如何学习python
  • 原文地址:https://www.cnblogs.com/00986014w/p/10305425.html
Copyright © 2011-2022 走看看