The random scheduler v1
The first important part of the scheduler is to watch the newly created pods in a continuous loop. We can use the Kubernetes client-go SDK’s Watch
method on pods that gives us a channel of events that we can range
over.
watch, err := s.clientset.CoreV1().Pods("").Watch(metav1.ListOptions{ FieldSelector: fmt.Sprintf("spec.schedulerName=%s,spec.nodeName=", schedulerName), }) ... for event := range watch.ResultChan() { if event.Type != "ADDED" { continue } p := event.Object.(*v1.Pod) fmt.Println("found a pod to schedule:", p.Namespace, "/", p.Name) ... }
The Watch function takes a FieldSelector
as an input parameter. We added two parts to that selector. spec.nodeName=
means that we are only interested in pods that doesn’t have a nodeName
set. If they have a nodeName
, it means that they are already scheduled.
The spec.schedulerName=random-scheduler
part is more interesting. Kubernetes is able to run multiple schedulers in a cluster and a pod specification can contain a scheduler name (see example later). If a scheduler name is set, it is expected that the corresponding scheduler will bind that pod to a node. No scheduler name set means the default scheduler. But this field selector means that it is the scheduler’s own responsibility to find the pods that belong to it. If you’re leaving this field selector out of your implementation, you can mess things up, because multiple schedulers can think that this pod belongs to them and some kind of race condition can arouse.
When ranging through the events provided by the watch function, it’s also needed to filter for ADDED
events, because we only want to care about newly added pods.
The next part is to find a fitting node. In our very simple example, we’ll select a random node from the list of nodes. Note that we are querying the apiserver
for the list of nodes on every schedule event. It’s not a good idea for performance reasons, but more on that later.
nodes, _ := s.clientset.CoreV1().Nodes().List(metav1.ListOptions{}) return &nodes.Items[rand.Intn(len(nodes.Items))], nil
After we’ve found a node for our pod, the only important thing that’s left is to let the apiserver
know. We can do it through the Bind
function, it’s quite simple:
s.clientset.CoreV1().Pods(p.Namespace).Bind(&v1.Binding{ ObjectMeta: metav1.ObjectMeta{ Name: p.Name, Namespace: p.Namespace, }, Target: v1.ObjectReference{ APIVersion: "v1", Kind: "Node", Name: randomNode.Name, }, })
The random scheduler v2: Informers
If you don’t want to go into more details of a scheduler implementation (caching through informers) you can skip this section and move on to the next one. In the v1
version of our scheduler we didn’t really care about performance. But the default Kubernetes scheduler is known for its performance. It is able to schedule thousands of pods in only a few minutes, so it’s worth checking out some of the tricks it is using. One of those tricks is to use informers
instead of querying all the nodes and pods all the time.
The Kubernetes documentation tells this about informers
:
SharedInformers provide hooks to receive notifications of adds, updates, and deletes for a particular resource. They also provide convenience functions for accessing shared caches.
So for example if we add a node informer
to our code, it will be able to cache the nodes in the cluster, and it will update the state of the cache if a new node is added, or an existing node is deleted. We’ll no longer have to query the apiserver
for the list of nodes every time we’re scheduling a pod! When initializing the scheduler, we can create the informer and return its node lister.
nodeInformer := factory.Core().V1().Nodes() nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { node := obj.(*v1.Node) log.Printf("New Node Added to Store: %s", node.GetName()) }, }) factory.Start(quit) return nodeInformer.Lister()
kubebatch
newSchedulerCache
func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue string) *SchedulerCache { sc := &SchedulerCache{ Jobs: make(map[kbapi.JobID]*kbapi.JobInfo), Nodes: make(map[string]*kbapi.NodeInfo), Queues: make(map[kbapi.QueueID]*kbapi.QueueInfo), PriorityClasses: make(map[string]*v1beta1.PriorityClass), errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), deletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), kubeclient: kubernetes.NewForConfigOrDie(config), kbclient: kbver.NewForConfigOrDie(config), defaultQueue: defaultQueue, schedulerName: schedulerName, } // Prepare event clients. broadcaster := record.NewBroadcaster() broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: sc.kubeclient.CoreV1().Events("")}) sc.Recorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: schedulerName}) sc.Binder = &defaultBinder{ kubeclient: sc.kubeclient, } sc.Evictor = &defaultEvictor{ kubeclient: sc.kubeclient, } sc.StatusUpdater = &defaultStatusUpdater{ kubeclient: sc.kubeclient, kbclient: sc.kbclient, } informerFactory := informers.NewSharedInformerFactory(sc.kubeclient, 0) sc.nodeInformer = informerFactory.Core().V1().Nodes() sc.pvcInformer = informerFactory.Core().V1().PersistentVolumeClaims() sc.pvInformer = informerFactory.Core().V1().PersistentVolumes() sc.scInformer = informerFactory.Storage().V1().StorageClasses() sc.VolumeBinder = &defaultVolumeBinder{ volumeBinder: volumebinder.NewVolumeBinder( sc.kubeclient, sc.nodeInformer, sc.pvcInformer, sc.pvInformer, sc.scInformer, 30*time.Second, ), } // create informer for node information sc.nodeInformer = informerFactory.Core().V1().Nodes() sc.nodeInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: sc.AddNode, UpdateFunc: sc.UpdateNode, DeleteFunc: sc.DeleteNode, }, 0, ) // create informer for pod information sc.podInformer = informerFactory.Core().V1().Pods() sc.podInformer.Informer().AddEventHandler( cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { switch obj.(type) { case *v1.Pod: pod := obj.(*v1.Pod) if strings.Compare(pod.Spec.SchedulerName, schedulerName) == 0 && pod.Status.Phase == v1.PodPending { return true } return pod.Status.Phase != v1.PodPending default: return false } }, Handler: cache.ResourceEventHandlerFuncs{ AddFunc: sc.AddPod, UpdateFunc: sc.UpdatePod, DeleteFunc: sc.DeletePod, }, }) sc.pdbInformer = informerFactory.Policy().V1beta1().PodDisruptionBudgets() sc.pdbInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: sc.AddPDB, UpdateFunc: sc.UpdatePDB, DeleteFunc: sc.DeletePDB, }) sc.pcInformer = informerFactory.Scheduling().V1beta1().PriorityClasses() sc.pcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: sc.AddPriorityClass, UpdateFunc: sc.UpdatePriorityClass, DeleteFunc: sc.DeletePriorityClass, }) kbinformer := kbinfo.NewSharedInformerFactory(sc.kbclient, 0) // create informer for PodGroup(v1alpha1) information sc.podGroupInformerv1alpha1 = kbinformer.Scheduling().V1alpha1().PodGroups() sc.podGroupInformerv1alpha1.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: sc.AddPodGroupAlpha1, UpdateFunc: sc.UpdatePodGroupAlpha1, DeleteFunc: sc.DeletePodGroupAlpha1, }) // create informer for PodGroup(v1alpha2) information sc.podGroupInformerv1alpha2 = kbinformer.Scheduling().V1alpha2().PodGroups() sc.podGroupInformerv1alpha2.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: sc.AddPodGroupAlpha2, UpdateFunc: sc.UpdatePodGroupAlpha2, DeleteFunc: sc.DeletePodGroupAlpha2, }) // create informer for Queue(v1alpha1) information sc.queueInformerv1alpha1 = kbinformer.Scheduling().V1alpha1().Queues() sc.queueInformerv1alpha1.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: sc.AddQueuev1alpha1, UpdateFunc: sc.UpdateQueuev1alpha1, DeleteFunc: sc.DeleteQueuev1alpha1, }) // create informer for Queue(v1alpha2) information sc.queueInformerv1alpha2 = kbinformer.Scheduling().V1alpha2().Queues() sc.queueInformerv1alpha2.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: sc.AddQueuev1alpha2, UpdateFunc: sc.UpdateQueuev1alpha2, DeleteFunc: sc.DeleteQueuev1alpha2, }) return sc }