zoukankan      html  css  js  c++  java
  • 图解kubernetes调度器SchedulerCache核心源码实现

    SchedulerCache是kubernetes scheduler中负责本地数据缓存的核心数据结构, 其实现了Cache接口,负责存储从apiserver获取的数据,提供给Scheduler调度器获取Node的信息,然后由调度算法的决策pod的最终node节点,其中Snapshot和节点打散算法非常值得借鉴

    设计目标

    数据感知

    image.png
    SchedulerCache的数据从apiserver通过网络感知,其数据的同步一致性主要是通过kubernetes中的Reflector组件来负责保证,SchedulerCache本身就是一个单纯数据的存储

    Snapshot机制

    image.png
    当scheduler获取一个待调度的pod,则需要从Cache中获取当前集群中的快照数据(当前此时集群中node的统计信息), 用于后续调度流程中使用

    节点打散

    image.png
    节点打散主要是指的调度器调度的时候,在满足调度需求的情况下,为了保证pod均匀分配到所有的node节点上,通常会按照逐个zone逐个node节点进行分配,从而让pod节点打散在整个集群中

    过期删除

    image.png
    Scheduler进行完成调度流程的决策之后,为pod选择了一个node节点,此时还未进行后续的Bind操作,但实际上资源已经分配给该pod, 此时会先更新到本地缓存(),然后再等待apiserver进行数据的广播并且最终被kubelet来进行实际的调度

    但如果因为某些原因导致pod后续的事件都没有被监听到,则需要将对应的pod资源进行删除,并删除对node资源的占用

    cache内部pod状态机

    image.png
    在scheduler cache中pod会一个内部的状态机:initial、Assumed、Expired、Added、Delete,实际上所有的操作都是围绕着该状态机在进行,状态如下:
    Initial: 初始化完成从apiserver监听到(也可能是监听到一个已经完成分配的pod)
    Assumed: 在scheduler中完成分配最终完成bind操作的pod(未实际分配)
    Added: 首先监听到事件可能是一个已经完成实际调度的pod(即从initial到Added),其次可能是经过调度决策后,被实际调度(从Assumed到Added),最后则是后续pod的更新(Update), Added语义上其实就是往Cache中添加一个Pod状态
    Deleted: 某个pod被监听到删除事件,只有被Added过的数据才可以被Deleted
    Expired: Assumed pod经过一段时间后没有感知到真正的分配事件被删除

    源码实现

    数据结构

    type schedulerCache struct {
    	stop   <-chan struct{}
    	ttl    time.Duration
    	period time.Duration
    
    	// 保证数据的安全
    	mu sync.RWMutex
        // 存储假定pod的信息集合,经过scheduler调度后假定pod被调度到某些节点,进行本地临时存储
        // 主要是为了进行node资源的占用,可以通过key在podStats查找到假定的pod信息
    	assumedPods map[string]bool
    	// pod的状态
    	podStates map[string]*podState
        // 存储node的映射
    	nodes     map[string]*nodeInfoListItem
    	csiNodes  map[string]*storagev1beta1.CSINode
    	// node信息的链表,按照最近更新时间来进行连接
    	headNode *nodeInfoListItem
        // 存储node、zone的映射信息
    	nodeTree *NodeTree
    	// 镜像信息
    	imageStates map[string]*imageState
    }
    

    Snapshot机制

    数据结构

    Snapshot数据结构主要负责存储当前集群中的node信息,并且通过Generation记录当前更新的最后一个周期

    type Snapshot struct {
    	NodeInfoMap map[string]*NodeInfo
    	Generation  int64
    }
    

    Snapshot的创建与更新

    创建主要位于kubernetes/pkg/scheduler/core/generic_scheduler.go,实际上就是创建一个空的snapshot对象

    nodeInfoSnapshot:         framework.NodeInfoSnapshot(),
    

    数据的更新则是通过snapshot方法来调用Cache的更新接口来进行更新

    func (g *genericScheduler) snapshot() error {
    	// Used for all fit and priority funcs.
    	return g.cache.UpdateNodeInfoSnapshot(g.nodeInfoSnapshot)
    }
    

    借助headNode实现增量标记

    随着集群中node和pod的数量的增加,如果每次都全量获取snapshot则会严重影响调度器的调度效率,在Cache中通过一个双向链表和node的递增计数(etcd实现)来实现增量更新
    image.png

    func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *schedulernodeinfo.Snapshot) error {
    	cache.mu.Lock()
    	defer cache.mu.Unlock()
    	balancedVolumesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes)
    
    	// 获取当前snapshot的Genration
    	snapshotGeneration := nodeSnapshot.Generation
    
        // 遍历双向链表,更新snapshot信息
    	for node := cache.headNode; node != nil; node = node.next {
    		if node.info.GetGeneration() <= snapshotGeneration {
    			//所有node信息都更新完毕
    			break
    		}
    		if balancedVolumesEnabled && node.info.TransientInfo != nil {
    			// Transient scheduler info is reset here.
    			node.info.TransientInfo.ResetTransientSchedulerInfo()
    		}
    		if np := node.info.Node(); np != nil {
    			nodeSnapshot.NodeInfoMap[np.Name] = node.info.Clone()
    		}
    	}
    	// 更新snapshot的genration
    	if cache.headNode != nil {
    		nodeSnapshot.Generation = cache.headNode.info.GetGeneration()
    	}
    
        // 如果snapshot里面包含过期的pod信息则进行清理工作
    	if len(nodeSnapshot.NodeInfoMap) > len(cache.nodes) {
    		for name := range nodeSnapshot.NodeInfoMap {
    			if _, ok := cache.nodes[name]; !ok {
    				delete(nodeSnapshot.NodeInfoMap, name)
    			}
    		}
    	}
    	return nil
    }
    
    

    nodeTree

    nodeTree主要负责节点的打散,用于让pod均匀分配在多个zone中的node节点上

    2.3.1 数据结构

    type NodeTree struct {
    	tree      map[string]*nodeArray // 存储zone和zone下面的node信息
    	zones     []string              // 存储zones
    	zoneIndex int
    	numNodes  int
    	mu        sync.RWMutex
    }
    

    其中zones和zoneIndex主要用于后面的节点打散算法使用,实现按zone逐个分配

    nodeArray

    nodeArray负责存储一个zone下面的所有node节点,并且通过lastIndex记录当前zone分配的节点索引

    type nodeArray struct {
    	nodes     []string
    	lastIndex int
    }
    

    添加node

    添加node其实很简单,只需要获取对应node的zone信息,然后加入对应zone的nodeArray中

    func (nt *NodeTree) addNode(n *v1.Node) {
    	// 获取zone
    	zone := utilnode.GetZoneKey(n)
    	if na, ok := nt.tree[zone]; ok {
    		for _, nodeName := range na.nodes {
    			if nodeName == n.Name {
    				klog.Warningf("node %q already exist in the NodeTree", n.Name)
    				return
    			}
    		}
            // 吧节点加入到zone中
    		na.nodes = append(na.nodes, n.Name)
    	} else {
            // 新加入zone
    		nt.zones = append(nt.zones, zone)
    		nt.tree[zone] = &nodeArray{nodes: []string{n.Name}, lastIndex: 0}
    	}
    	klog.V(2).Infof("Added node %q in group %q to NodeTree", n.Name, zone)
    	nt.numNodes++
    }
    

    数据打散算法

    image.png
    数据打散算法很简单,首先我们存储了zone和nodeArray的信息,然后我们只需要通过两个索引zoneIndex和nodeIndex就可以实现节点的打散操作, 只有当当前集群中所有zone里面的所有节点都进行一轮分配后,然后重建分配索引

    func (nt *NodeTree) Next() string {
    	nt.mu.Lock()
    	defer nt.mu.Unlock()
    	if len(nt.zones) == 0 {
    		return ""
    	}
        // 记录分配完所有node的zone的计数,用于进行状态重置
        // 比如有3个zone: 则当numExhaustedZones=3的时候,就会重新从头开始进行分配
    	numExhaustedZones := 0
    	for {
    		if nt.zoneIndex >= len(nt.zones) {
    			nt.zoneIndex = 0
    		}
            // 按照zone索引来进行逐个zone分配
    		zone := nt.zones[nt.zoneIndex]
    		nt.zoneIndex++
    		// 返回当前zone下面的next节点,如果exhausted为True则表明当前zone所有的节点,在这一轮调度中都已经分配了一次
            // 就需要从下个zone继续获取节点
    		nodeName, exhausted := nt.tree[zone].next()
    		if exhausted {
    			numExhaustedZones++
                // 所有的zone下面的node都被分配了一次,这里进行重置,从头开始继续分配
    			if numExhaustedZones >= len(nt.zones) { // all zones are exhausted. we should reset.
    				nt.resetExhausted()
    			}
    		} else {
    			return nodeName
    		}
    	}
    }
    

    重建索引

    重建索引则是将所有nodeArray的索引和当前zoneIndex进行归零

    
    func (nt *NodeTree) resetExhausted() {// 重置索引
    	for _, na := range nt.tree {
    		na.lastIndex = 0
    	}
    	nt.zoneIndex = 0
    }
    
    

    数据过期清理

    数据存储

    Cache要定时将之前在经过本地scheduler分配完成后的假设的pod的信息进行清理,如果这些pod在给定时间内仍然没有感知到对应的pod真正的添加事件则就这些pod删除

    assumedPods map[string]bool
    

    后台定时任务

    默认每30s进行清理一次

    func (cache *schedulerCache) run() {
    	go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop)
    }
    

    清理逻辑

    清理逻辑主要是针对那些已经完成绑定的pod来进行,如果一个pod完成了在scheduler里面的所有操作后,会有一个过期时间,当前是30s,如果超过该时间即deadline小于当前的时间就删除该pod

    
    // cleanupAssumedPods exists for making test deterministic by taking time as input argument.
    func (cache *schedulerCache) cleanupAssumedPods(now time.Time) {
    	cache.mu.Lock()
    	defer cache.mu.Unlock()
    
    	// The size of assumedPods should be small
    	for key := range cache.assumedPods {
    		ps, ok := cache.podStates[key]
    		if !ok {
    			panic("Key found in assumed set but not in podStates. Potentially a logical error.")
    		}
            // 未完成绑定的pod不会被进行清理
    		if !ps.bindingFinished {
    			klog.V(3).Infof("Couldn't expire cache for pod %v/%v. Binding is still in progress.",
    				ps.pod.Namespace, ps.pod.Name)
    			continue
    		}
            // 在完成bind之后会设定一个过期时间,目前是30s,如果deadline即bind时间+30s小于当前时间就过期删除
    		if now.After(*ps.deadline) {
    			klog.Warningf("Pod %s/%s expired", ps.pod.Namespace, ps.pod.Name)
    			if err := cache.expirePod(key, ps); err != nil {
    				klog.Errorf("ExpirePod failed for %s: %v", key, err)
    			}
    		}
    	}
    }
    

    清理pod

    清理pod主要分为如下几个部分:
    1.对应pod假定分配node的信息
    2.清理映射的podState信息

    func (cache *schedulerCache) expirePod(key string, ps *podState) error {
    	if err := cache.removePod(ps.pod); err != nil {
    		return err
    	}
    	delete(cache.assumedPods, key)
    	delete(cache.podStates, key)
    	return nil
    }
    
    

    设计总结

    image.png
    核心数据结构数据流如上所示,其核心是通过nodes、headNode实现一个Snapshot为调度器提供当前系统资源的快照,并通过nodeTree进行node节点的打散,最后内部通过一个pod的状态机来进行系统内部的pod资源状态的转换,并通过后台的定时任务来保证经过经过Reflector获取的数据的最终一致性(删除那些经过bind的但是却没被实际调度或者事件丢失的pod), 借助这些其实一个最基础的工业级调度器的本地cache功能就实现了

    微信号:baxiaoshi2020
    关注公告号阅读更多源码分析文章 21天大棚
    更多文章关注 www.sreguide.com
    本文由博客一文多发平台 OpenWrite 发布

  • 相关阅读:
    SQL Server-数据库架构和对象、定义数据完整性(二)
    SQL Server-语句类别、数据库范式、系统数据库组成(一)
    ASP.NET WebAPi之断点续传下载(下)
    ConcurrentDictionary线程不安全么,你难道没疑惑,你难道弄懂了么?
    ASP.NET WebAPi之断点续传下载(中)
    ASP.NET WebAPi之断点续传下载(上)
    ASP.NET WebAPi(selfhost)之文件同步或异步上传
    JSTL fn:contains()函数
    用jstl标签判断一个字符串是否包含了另一个字符串
    fn:replace()函数
  • 原文地址:https://www.cnblogs.com/buyicoding/p/12190532.html
Copyright © 2011-2022 走看看