zoukankan      html  css  js  c++  java
  • kube-batch ——pod 和task

    pkg/scheduler/scheduler.go:67

    相应的资源 Informer 开始 Iist-Watch 监听事件变化

    // Run  starts the schedulerCache
    func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
        go sc.pdbInformer.Informer().Run(stopCh)
        go sc.podInformer.Informer().Run(stopCh)
        go sc.nodeInformer.Informer().Run(stopCh)
        go sc.podGroupInformerv1alpha1.Informer().Run(stopCh)
        go sc.podGroupInformerv1alpha2.Informer().Run(stopCh)
        go sc.pvInformer.Informer().Run(stopCh)
        go sc.pvcInformer.Informer().Run(stopCh)
        go sc.scInformer.Informer().Run(stopCh)
        go sc.queueInformerv1alpha1.Informer().Run(stopCh)
        go sc.queueInformerv1alpha2.Informer().Run(stopCh)
    
        if options.ServerOpts.EnablePriorityClass {
            go sc.pcInformer.Informer().Run(stopCh)
        }
    
        // Re-sync error tasks.
        go wait.Until(sc.processResyncTask, 0, stopCh)
    
        // Cleanup jobs.
        go wait.Until(sc.processCleanupJob, 0, stopCh)
    }

    sc.podInformer

            // create informer for pod information
            sc.podInformer = informerFactory.Core().V1().Pods()
            sc.podInformer.Informer().AddEventHandler(
                    cache.FilteringResourceEventHandler{
                            FilterFunc: func(obj interface{}) bool {
                                    switch obj.(type) {
                                    case *v1.Pod:
                                            pod := obj.(*v1.Pod)
                                            if strings.Compare(pod.Spec.SchedulerName, schedulerName) == 0 && pod.Status.Phase == v1.PodPending {
                                                    return true
                                            }
                                            return pod.Status.Phase != v1.PodPending
                                    default:
                                            return false
                                    }
                            },
                            Handler: cache.ResourceEventHandlerFuncs{
                                    AddFunc:    sc.AddPod,
                                    UpdateFunc: sc.UpdatePod,
                                    DeleteFunc: sc.DeletePod,
                            },
                    })

    这里可以看到,kube-batch只关心需要自己调度,并且Pending的Pod;以及Running的Pod。

    kube-batchpkgschedulercacheevent_handlers.go

    func (sc *SchedulerCache) AddPod(obj interface{}) {
        sc.Mutex.Lock()
        defer sc.Mutex.Unlock()
    
        err := sc.addPod(pod)
    }
    
    // Assumes that lock is already acquired.
    func (sc *SchedulerCache) addPod(pod *v1.Pod) error {
        pi := arbapi.NewTaskInfo(pod)
    
        return sc.addTask(pi)
    }

    全局一把锁,以后会是性能瓶颈。这里我们看到kube-batch会将Pod转换成TaskInfo缓存起来。

    kube-batchpkgschedulerapijob_info.go

    func NewTaskInfo(pod *v1.Pod) *TaskInfo {
       req := EmptyResource()
    
       // TODO(k82cn): also includes initContainers' resource.
       for _, c := range pod.Spec.Containers {
          req.Add(NewResource(c.Resources.Requests))
       }
    
       ti := &TaskInfo{
          UID:       TaskID(pod.UID),
          Job:       getJobID(pod),
          Name:      pod.Name,
          Namespace: pod.Namespace,
          NodeName:  pod.Spec.NodeName,
          Status:    getTaskStatus(pod),
          Priority:  1,
    
          Pod:    pod,
          Resreq: req,
       }
    
       if pod.Spec.Priority != nil {
          ti.Priority = *pod.Spec.Priority
       }
    
       return ti
    }

    转换过程比较简单,注意两点:

    • 需要统计资源请求量
    • JobID通过pod.Annotations[arbcorev1.GroupNameAnnotationKey]或者所属的controller

    kube-batchpkgschedulercacheevent_handlers.go

    func (sc *SchedulerCache) addTask(pi *arbapi.TaskInfo) error {
       if len(pi.Job) != 0 {
          if _, found := sc.Jobs[pi.Job]; !found {
             sc.Jobs[pi.Job] = arbapi.NewJobInfo(pi.Job)
          }
    
          sc.Jobs[pi.Job].AddTaskInfo(pi)
       }
    }

    kube-batchpkgschedulerapijob_info.go

    func NewJobInfo(uid JobID) *JobInfo {
       return &JobInfo{
          UID: uid,
    
          MinAvailable: 0,
          NodeSelector: make(map[string]string),
    
          Allocated:    EmptyResource(),
          TotalRequest: EmptyResource(),
    
          TaskStatusIndex: map[TaskStatus]tasksMap{},
          Tasks:           tasksMap{},
       }
    }
    
    func (ji *JobInfo) AddTaskInfo(ti *TaskInfo) {
        ji.Tasks[ti.UID] = ti
        ji.addTaskIndex(ti)
    
        ji.TotalRequest.Add(ti.Resreq)
    }
    
    func (ji *JobInfo) addTaskIndex(ti *TaskInfo) {
        if _, found := ji.TaskStatusIndex[ti.Status]; !found {
            ji.TaskStatusIndex[ti.Status] = tasksMap{}
        }
    
        ji.TaskStatusIndex[ti.Status][ti.UID] = ti
    }

    最终task会归于一个job,job主要保存tasks,资源请求总量等信息。

    kbinformer.Scheduling().V1alpha2().PodGroups

            // create informer for PodGroup(v1alpha2) information
            sc.podGroupInformerv1alpha2 = kbinformer.Scheduling().V1alpha2().PodGroups()
            sc.podGroupInformerv1alpha2.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
                    AddFunc:    sc.AddPodGroupAlpha2,
                    UpdateFunc: sc.UpdatePodGroupAlpha2,
                    DeleteFunc: sc.DeletePodGroupAlpha2,
            })
  • 相关阅读:
    Template(模板)模式
    Android活动(Activity)创建及生命周期
    Android--SharedPreferences数据存储方案
    Adapter适配器模式--图解设计模式
    准时制生产(Just in Time,JIT)
    术语
    制造资源计划(Manufacturing Resource Planning,Mrp II)
    Angualr6表单提交验证并跳转
    Android PDA扫描枪广播接搜条码并使用
    Java统计代码行数
  • 原文地址:https://www.cnblogs.com/dream397/p/14985317.html
Copyright © 2011-2022 走看看