zoukankan      html  css  js  c++  java
  • kube batch scheduler

     

    The random scheduler v1

    The first important part of the scheduler is to watch the newly created pods in a continuous loop. We can use the Kubernetes client-go SDK’s Watch method on pods that gives us a channel of events that we can range over.

    watch, err := s.clientset.CoreV1().Pods("").Watch(metav1.ListOptions{
      FieldSelector: fmt.Sprintf("spec.schedulerName=%s,spec.nodeName=", schedulerName),
    })
    
    ...
    
    for event := range watch.ResultChan() {
      if event.Type != "ADDED" {
                continue
      }
      p := event.Object.(*v1.Pod)
      fmt.Println("found a pod to schedule:", p.Namespace, "/", p.Name)
      ...
    }

    The Watch function takes a FieldSelector as an input parameter. We added two parts to that selector. spec.nodeName= means that we are only interested in pods that doesn’t have a nodeName set. If they have a nodeName, it means that they are already scheduled.

    The spec.schedulerName=random-scheduler part is more interesting. Kubernetes is able to run multiple schedulers in a cluster and a pod specification can contain a scheduler name (see example later). If a scheduler name is set, it is expected that the corresponding scheduler will bind that pod to a node. No scheduler name set means the default scheduler. But this field selector means that it is the scheduler’s own responsibility to find the pods that belong to it. If you’re leaving this field selector out of your implementation, you can mess things up, because multiple schedulers can think that this pod belongs to them and some kind of race condition can arouse.

    When ranging through the events provided by the watch function, it’s also needed to filter for ADDED events, because we only want to care about newly added pods.

    The next part is to find a fitting node. In our very simple example, we’ll select a random node from the list of nodes. Note that we are querying the apiserver for the list of nodes on every schedule event. It’s not a good idea for performance reasons, but more on that later.

    nodes, _ := s.clientset.CoreV1().Nodes().List(metav1.ListOptions{})
    return &nodes.Items[rand.Intn(len(nodes.Items))], nil

    After we’ve found a node for our pod, the only important thing that’s left is to let the apiserver know. We can do it through the Bind function, it’s quite simple:

    s.clientset.CoreV1().Pods(p.Namespace).Bind(&v1.Binding{
        ObjectMeta: metav1.ObjectMeta{
            Name:      p.Name,
            Namespace: p.Namespace,
        },
        Target: v1.ObjectReference{
            APIVersion: "v1",
            Kind:       "Node",
            Name:       randomNode.Name,
        },
    })

     

    The random scheduler v2: Informers

    If you don’t want to go into more details of a scheduler implementation (caching through informers) you can skip this section and move on to the next one. In the v1 version of our scheduler we didn’t really care about performance. But the default Kubernetes scheduler is known for its performance. It is able to schedule thousands of pods in only a few minutes, so it’s worth checking out some of the tricks it is using. One of those tricks is to usinformers instead of querying all the nodes and pods all the time.

    The Kubernetes documentation tells this about informers:

    SharedInformers provide hooks to receive notifications of adds, updates, and deletes for a particular resource. They also provide convenience functions for accessing shared caches.

    So for example if we add a node informer to our code, it will be able to cache the nodes in the cluster, and it will update the state of the cache if a new node is added, or an existing node is deleted. We’ll no longer have to query the apiserver for the list of nodes every time we’re scheduling a pod! When initializing the scheduler, we can create the informer and return its node lister.

    nodeInformer := factory.Core().V1().Nodes()
    nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            node := obj.(*v1.Node)
            log.Printf("New Node Added to Store: %s", node.GetName())
        },
    })
    factory.Start(quit)
    return nodeInformer.Lister()

    kubebatch

    newSchedulerCache

    func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue string) *SchedulerCache {
        sc := &SchedulerCache{
            Jobs:            make(map[kbapi.JobID]*kbapi.JobInfo),
            Nodes:           make(map[string]*kbapi.NodeInfo),
            Queues:          make(map[kbapi.QueueID]*kbapi.QueueInfo),
            PriorityClasses: make(map[string]*v1beta1.PriorityClass),
            errTasks:        workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
            deletedJobs:     workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
            kubeclient:      kubernetes.NewForConfigOrDie(config),
            kbclient:        kbver.NewForConfigOrDie(config),
            defaultQueue:    defaultQueue,
            schedulerName:   schedulerName,
        }
    
        // Prepare event clients.
        broadcaster := record.NewBroadcaster()
        broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: sc.kubeclient.CoreV1().Events("")})
        sc.Recorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: schedulerName})
    
        sc.Binder = &defaultBinder{
            kubeclient: sc.kubeclient,
        }
    
        sc.Evictor = &defaultEvictor{
            kubeclient: sc.kubeclient,
        }
    
        sc.StatusUpdater = &defaultStatusUpdater{
            kubeclient: sc.kubeclient,
            kbclient:   sc.kbclient,
        }
    
        informerFactory := informers.NewSharedInformerFactory(sc.kubeclient, 0)
    
        sc.nodeInformer = informerFactory.Core().V1().Nodes()
        sc.pvcInformer = informerFactory.Core().V1().PersistentVolumeClaims()
        sc.pvInformer = informerFactory.Core().V1().PersistentVolumes()
        sc.scInformer = informerFactory.Storage().V1().StorageClasses()
        sc.VolumeBinder = &defaultVolumeBinder{
            volumeBinder: volumebinder.NewVolumeBinder(
                sc.kubeclient,
                sc.nodeInformer,
                sc.pvcInformer,
                sc.pvInformer,
                sc.scInformer,
                30*time.Second,
            ),
        }
    
        // create informer for node information
        sc.nodeInformer = informerFactory.Core().V1().Nodes()
        sc.nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
            cache.ResourceEventHandlerFuncs{
                AddFunc:    sc.AddNode,
                UpdateFunc: sc.UpdateNode,
                DeleteFunc: sc.DeleteNode,
            },
            0,
        )
    
        // create informer for pod information
        sc.podInformer = informerFactory.Core().V1().Pods()
        sc.podInformer.Informer().AddEventHandler(
            cache.FilteringResourceEventHandler{
                FilterFunc: func(obj interface{}) bool {
                    switch obj.(type) {
                    case *v1.Pod:
                        pod := obj.(*v1.Pod)
                        if strings.Compare(pod.Spec.SchedulerName, schedulerName) == 0 && pod.Status.Phase == v1.PodPending {
                            return true
                        }
                        return pod.Status.Phase != v1.PodPending
                    default:
                        return false
                    }
                },
                Handler: cache.ResourceEventHandlerFuncs{
                    AddFunc:    sc.AddPod,
                    UpdateFunc: sc.UpdatePod,
                    DeleteFunc: sc.DeletePod,
                },
            })
    
        sc.pdbInformer = informerFactory.Policy().V1beta1().PodDisruptionBudgets()
        sc.pdbInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
            AddFunc:    sc.AddPDB,
            UpdateFunc: sc.UpdatePDB,
            DeleteFunc: sc.DeletePDB,
        })
    
        sc.pcInformer = informerFactory.Scheduling().V1beta1().PriorityClasses()
        sc.pcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
            AddFunc:    sc.AddPriorityClass,
            UpdateFunc: sc.UpdatePriorityClass,
            DeleteFunc: sc.DeletePriorityClass,
        })
    
        kbinformer := kbinfo.NewSharedInformerFactory(sc.kbclient, 0)
        // create informer for PodGroup(v1alpha1) information
        sc.podGroupInformerv1alpha1 = kbinformer.Scheduling().V1alpha1().PodGroups()
        sc.podGroupInformerv1alpha1.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
            AddFunc:    sc.AddPodGroupAlpha1,
            UpdateFunc: sc.UpdatePodGroupAlpha1,
            DeleteFunc: sc.DeletePodGroupAlpha1,
        })
    
        // create informer for PodGroup(v1alpha2) information
        sc.podGroupInformerv1alpha2 = kbinformer.Scheduling().V1alpha2().PodGroups()
        sc.podGroupInformerv1alpha2.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
            AddFunc:    sc.AddPodGroupAlpha2,
            UpdateFunc: sc.UpdatePodGroupAlpha2,
            DeleteFunc: sc.DeletePodGroupAlpha2,
        })
    
        // create informer for Queue(v1alpha1) information
        sc.queueInformerv1alpha1 = kbinformer.Scheduling().V1alpha1().Queues()
        sc.queueInformerv1alpha1.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
            AddFunc:    sc.AddQueuev1alpha1,
            UpdateFunc: sc.UpdateQueuev1alpha1,
            DeleteFunc: sc.DeleteQueuev1alpha1,
        })
    
        // create informer for Queue(v1alpha2) information
        sc.queueInformerv1alpha2 = kbinformer.Scheduling().V1alpha2().Queues()
        sc.queueInformerv1alpha2.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
            AddFunc:    sc.AddQueuev1alpha2,
            UpdateFunc: sc.UpdateQueuev1alpha2,
            DeleteFunc: sc.DeleteQueuev1alpha2,
        })
    
        return sc
    }

    自定义 scheduler

    Writing custom Kubernetes schedulers

  • 相关阅读:
    搭建一个简单的springMVC框架
    java枚举使用
    java中枚举类型的使用
    java递归算法
    JAVA递归算法及经典递归例子 对于这个汉诺塔问题
    java斐波纳契数列
    要求给一个数值,计算它的阶乘
    AcWing2193 分配问题(二分图最优匹配)
    2020上海大学校赛L 动物森友会(网络流+二分)
    BZOJ2654 tree(wqs二分)
  • 原文地址:https://www.cnblogs.com/dream397/p/14981799.html
Copyright © 2011-2022 走看看