Scheduler是集群中Master节点的重要组件,其功能是根据集群中各Pod的资源需求、亲和性等指标,将Pod合理调度到Kubernetes集群中的各个节点上。
一、入口函数
入口函数与Controller Manager的入口函数结构相同,同样是应用了cobra包,在命令行中注册了kube-scheduler命令。
cmd/kube-scheduler/scheduler.go
func main() { rand.Seed(time.Now().UnixNano()) command := app.NewSchedulerCommand() pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc) logs.InitLogs() defer logs.FlushLogs() if err := command.Execute(); err != nil { fmt.Fprintf(os.Stderr, "%v ", err) os.Exit(1) } }
这里核心的方法仍然是NewSchedulerCommand。该方法位于app/server.go中,结构与Controller Manager几乎一样,因此不贴上来了。核心的部分仍然是在cobra.Command结构体的Run字段中调用runCommand方法。
runCommand方法为Scheduler配置Config,最终返回的是Run方法,将Scheduler运行起来。
runCommand方法中间有一行值得注意:
func runCommand(cmd *cobra.Command, args []string, opts *options.Options) error {
...
algorithmprovider.ApplyFeatureGates()
...
return Run(cc, stopCh)
}
这一行的作用是调用ApplyFeatureGates方法,并根据Feature Gate的配置,注册或者删除相应的预选策略。
进入ApplyFeatureGates方法,发现方法就一行,而整个包就这一个方法:
pkg/scheduler/algorithmprovider/plugin.go package algorithmprovider import "k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults" // ApplyFeatureGates applies algorithm by feature gates. func ApplyFeatureGates() { defaults.ApplyFeatureGates() }
事实上,在pkg/scheduler/algorirhmprovider/defaults/defaults.go中,有一个init方法:
pkg/scheduler/algorirhmprovider/defaults/defaults.go
func init() {
registerAlgorithmProvider(defaultPredicates(), defaultPriorities())
}
因此,在导入defaults包时,就已经执行了registerAlgorithmProvider方法,对一些预选与优选方法进行了注册。再配合ApplyFeatureGates方法,根据k8s中一些feature的开启情况,增加或删除一些预选和优选方法。这些feature的位置在pkg/features/kube_features.go中。
详细的预选和优选方法的定义位于pkg/scheduler/algorithm和pkg/scheduler/algorithmprovider中,这里不详细展开。
二、Run
看一下Run方法:
func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error { // Create the scheduler. sched, err := scheduler.New(cc.Client, cc.InformerFactory.Core().V1().Nodes(), cc.PodInformer, cc.InformerFactory.Core().V1().PersistentVolumes(), cc.InformerFactory.Core().V1().PersistentVolumeClaims(), cc.InformerFactory.Core().V1().ReplicationControllers(), cc.InformerFactory.Apps().V1().ReplicaSets(), cc.InformerFactory.Apps().V1().StatefulSets(), cc.InformerFactory.Core().V1().Services(), cc.InformerFactory.Policy().V1beta1().PodDisruptionBudgets(), cc.InformerFactory.Storage().V1().StorageClasses(), cc.Recorder, cc.ComponentConfig.AlgorithmSource, stopCh, scheduler.WithName(cc.ComponentConfig.SchedulerName), scheduler.WithHardPodAffinitySymmetricWeight(cc.ComponentConfig.HardPodAffinitySymmetricWeight), scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption), scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore), scheduler.WithBindTimeoutSeconds(*cc.ComponentConfig.BindTimeoutSeconds)) if err != nil { return err } // Prepare the event broadcaster. ... // Setup healthz checks. ...// Start all informers. go cc.PodInformer.Informer().Run(stopCh) cc.InformerFactory.Start(stopCh) // Wait for all caches to sync before scheduling. cc.InformerFactory.WaitForCacheSync(stopCh) controller.WaitForCacheSync("scheduler", stopCh, cc.PodInformer.Informer().HasSynced) // Prepare a reusable runCommand function. run := func(ctx context.Context) { sched.Run() <-ctx.Done() } ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here defer cancel() go func() { select { case <-stopCh: cancel() case <-ctx.Done(): } }() // If leader election is enabled, runCommand via LeaderElector until done and exit. ... // Leader election is disabled, so runCommand inline until done. run(ctx) return fmt.Errorf("finished without leader elect") }
Run方法主要包含下面几件事:
(1)创建Scheduler。
Run方法的前几行代码调用了New方法,创建了一个Scheduler对象。这个New方法位于pkg/scheduler/scheduler.go中:
pkg/scheduler/scheduler.go func New(client clientset.Interface, nodeInformer coreinformers.NodeInformer, podInformer coreinformers.PodInformer, pvInformer coreinformers.PersistentVolumeInformer, pvcInformer coreinformers.PersistentVolumeClaimInformer, replicationControllerInformer coreinformers.ReplicationControllerInformer, replicaSetInformer appsinformers.ReplicaSetInformer, statefulSetInformer appsinformers.StatefulSetInformer, serviceInformer coreinformers.ServiceInformer, pdbInformer policyinformers.PodDisruptionBudgetInformer, storageClassInformer storageinformers.StorageClassInformer, recorder record.EventRecorder, schedulerAlgorithmSource kubeschedulerconfig.SchedulerAlgorithmSource, stopCh <-chan struct{}, opts ...func(o *schedulerOptions)) (*Scheduler, error) { options := defaultSchedulerOptions for _, opt := range opts { opt(&options) } // Set up the configurator which can create schedulers from configs. configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{ SchedulerName: options.schedulerName, Client: client, NodeInformer: nodeInformer, PodInformer: podInformer, PvInformer: pvInformer, PvcInformer: pvcInformer, ReplicationControllerInformer: replicationControllerInformer, ReplicaSetInformer: replicaSetInformer, StatefulSetInformer: statefulSetInformer, ServiceInformer: serviceInformer, PdbInformer: pdbInformer, StorageClassInformer: storageClassInformer, HardPodAffinitySymmetricWeight: options.hardPodAffinitySymmetricWeight, DisablePreemption: options.disablePreemption, PercentageOfNodesToScore: options.percentageOfNodesToScore, BindTimeoutSeconds: options.bindTimeoutSeconds, }) var config *factory.Config source := schedulerAlgorithmSource switch { case source.Provider != nil: // Create the config from a named algorithm provider. ... case source.Policy != nil: // Create the config from a user specified policy source. ... default: return nil, fmt.Errorf("unsupported algorithm source: %v", source) } // Additional tweaks to the config produced by the configurator. config.Recorder = recorder config.DisablePreemption = options.disablePreemption config.StopEverything = stopCh // Create the scheduler. sched := NewFromConfig(config) return sched, nil }
New方法逻辑相对清晰,其本质就是根据传入的Informer、算法等参数,实例化一个Config,然后调用NewFromConfig方法,通过这个Config创建一个scheduler实例并返回。可以看到,scheduler中也用到了包括nodeInformer、podInformer等在内的大量Informer,因为scheduler也需要及时掌握资源的变化,从而调整调度的策略。
中间switch一段代码会判断config的调度算法源是用户自定义的还是给定的provider。如果使用默认的provider,则会将前面注册过的预选、优选方法加载进来。
创建config的NewConfigFactory方法位于pkg/scheduler/factory/factory.go中,进入方法:
pkg/scheduler/factory/factory.go // NewConfigFactory initializes the default implementation of a Configurator. To encourage eventual privatization of the struct type, we only // return the interface. func NewConfigFactory(args *ConfigFactoryArgs) Configurator { stopEverything := args.StopCh if stopEverything == nil { stopEverything = wait.NeverStop } schedulerCache := schedulerinternalcache.New(30*time.Second, stopEverything) // storageClassInformer is only enabled through VolumeScheduling feature gate var storageClassLister storagelisters.StorageClassLister if args.StorageClassInformer != nil { storageClassLister = args.StorageClassInformer.Lister() } c := &configFactory{ client: args.Client, podLister: schedulerCache, podQueue: internalqueue.NewSchedulingQueue(stopEverything), nodeLister: args.NodeInformer.Lister(), pVLister: args.PvInformer.Lister(), pVCLister: args.PvcInformer.Lister(), serviceLister: args.ServiceInformer.Lister(), controllerLister: args.ReplicationControllerInformer.Lister(), replicaSetLister: args.ReplicaSetInformer.Lister(), statefulSetLister: args.StatefulSetInformer.Lister(), pdbLister: args.PdbInformer.Lister(), storageClassLister: storageClassLister, schedulerCache: schedulerCache, StopEverything: stopEverything, schedulerName: args.SchedulerName, hardPodAffinitySymmetricWeight: args.HardPodAffinitySymmetricWeight, disablePreemption: args.DisablePreemption, percentageOfNodesToScore: args.PercentageOfNodesToScore, } c.scheduledPodsHasSynced = args.PodInformer.Informer().HasSynced // scheduled pod cache args.PodInformer.Informer().AddEventHandler( cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { switch t := obj.(type) { case *v1.Pod: return assignedPod(t) case cache.DeletedFinalStateUnknown: if pod, ok := t.Obj.(*v1.Pod); ok { return assignedPod(pod) } runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c)) return false default: runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj)) return false } }, Handler: cache.ResourceEventHandlerFuncs{ AddFunc: c.addPodToCache, UpdateFunc: c.updatePodInCache, DeleteFunc: c.deletePodFromCache, }, }, ) // unscheduled pod queue args.PodInformer.Informer().AddEventHandler( cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { switch t := obj.(type) { case *v1.Pod: return !assignedPod(t) && responsibleForPod(t, args.SchedulerName) case cache.DeletedFinalStateUnknown: if pod, ok := t.Obj.(*v1.Pod); ok { return !assignedPod(pod) && responsibleForPod(pod, args.SchedulerName) } runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c)) return false default: runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj)) return false } }, Handler: cache.ResourceEventHandlerFuncs{ AddFunc: c.addPodToSchedulingQueue, UpdateFunc: c.updatePodInSchedulingQueue, DeleteFunc: c.deletePodFromSchedulingQueue, }, }, ) // ScheduledPodLister is something we provide to plug-in functions that // they may need to call. c.scheduledPodLister = assignedPodLister{args.PodInformer.Lister()} args.NodeInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: c.addNodeToCache, UpdateFunc: c.updateNodeInCache, DeleteFunc: c.deleteNodeFromCache, }, ) args.PvInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ // MaxPDVolumeCountPredicate: since it relies on the counts of PV. AddFunc: c.onPvAdd, UpdateFunc: c.onPvUpdate, }, ) // This is for MaxPDVolumeCountPredicate: add/delete PVC will affect counts of PV when it is bound. args.PvcInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: c.onPvcAdd, UpdateFunc: c.onPvcUpdate, }, ) // This is for ServiceAffinity: affected by the selector of the service is updated. args.ServiceInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: c.onServiceAdd, UpdateFunc: c.onServiceUpdate, DeleteFunc: c.onServiceDelete, }, ) // Setup volume binder c.volumeBinder = volumebinder.NewVolumeBinder(args.Client, args.NodeInformer, args.PvcInformer, args.PvInformer, args.StorageClassInformer, time.Duration(args.BindTimeoutSeconds)*time.Second) args.StorageClassInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: c.onStorageClassAdd, }, ) // Setup cache debugger ... go func() { <-c.StopEverything c.podQueue.Close() }() return c }
该方法为一系列Informer初始化了回调函数。其中最重要的是PodInformer的两个回调函数。
可以看到,方法调用了两次AddEventHandler方法,都经过了过滤。第一次只处理已调度的Pod,第二次只处理未调度的Pod,并定义了对两种Pod的增、改、删方法,分别在缓存和队列中对这两种Pod进行更新。这样,就将已调度和未调度的Pod区分开。
后面为其他informer添加的回调函数,除了NodeInformer的回调函数会在缓存中更新node信息,其他回调函数最终都会调用MoveAllToActiveQueue方法,将待调度的Pod添加进队列。
此外,可以看到,在ConfigFactory中,有一个podQueue字段,维护了一个队列,用于存放待调度的Pod。
(2)运行广播和健康检查。
中间有几行是为Scheduler配置广播和健康检查相关内容,与Controller Manager类似,不提。
(3)Informer启动。
值得注意的是,Scheduler将PodInformer从其他的Informer中独立出来,因为对Pod的调度才是Scheduler的核心。
(4)运行Scheduler。
这是整个方法的核心。通过调用Scheduler的Run方法,将Scheduler运行起来。
进入Run方法,我们发现方法非常简洁,就做了2件事:
pkg/scheduler/scheduler.go
func (sched *Scheduler) Run() {
if !sched.config.WaitForCacheSync() {
return
}
go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}
第一件事是等待各informer的缓存同步,第二件事是调用scheduleOne方法,执行Pod的调度操作。wait.Until的作用是每隔一段时间执行一次sched.scheduleOne方法,除非sched.config.StopEverything被关闭。这里时间段被设置为0,所以scheduleOne方法会一个接一个不停地被调用。
scheduleOne方法的具体逻辑我们下一篇文章再继续分析。https://www.cnblogs.com/00986014w/p/10320844.html
三、总结
总结Scheduler的逻辑,大体上是通过cobra注册一个kube-scheduler命令并运行。命令运行时,首先应用给定的调度算法,然后基于ConfigFactory,创建一个Scheduler的实例,启动相关的Informer,然后开始执行调度。