zoukankan      html  css  js  c++  java
  • 基于 gpusharedeviceplugin 的 k8s device plugin 分析

    k8s device plugin 分析

    示例为 gpushare-device-plugin

    • device plugin 一般以 daemonset 部署在 node 节点上;
    • GRPC server,信道是 Unix socket,主要实现 ListAndWatchAllocate rpc,收集 device 信息、准备 device 环境;
    • 被 kubelet 中 device manager 管理

    device plugin 工作流程

    device plugin 下文简称 DP,device manager 下文简称 DM

    1. 以 daemonset 启动,立刻启动 GRPC server,监听独特的 Unix Socket 地址;
    2. 执行 register,DP ---register----> DM (GRPC server) ----ListAndWatch----> DP ;
    3. DM send rpc ListAndWatch 获得 device 信息,faked device ID + is_health;
    4. DM send rpc Allocate 得到启动容器所需的信息(env,mount,device,annotation);

    技术细节

    DP 启动的入口函数?

    1. 制作 docker
      Dockerfile 中编译 go 代码,得到 gpushare-device-plugin-v2,将其放置到 /usr/bin/,Entrypoint 为 gpushare-device-plugin-v2 -logtostderr

    2. daemonset 启动该容器后,会执行 gpushare-device-plugin-v2 -logtostderr 启动

    3. 主文件 cmd/nvidia/main.go

    	// 入口函数在此
    	ngm := nvidia.NewSharedGPUManager(*mps, *healthCheck, translatememoryUnits(*memoryUnit))
    	err := ngm.Run()
    

    DP 流程

    shareGPUManager ---run()---> NvidiaDevicePlugin ----Serve()----> Start GRPC server and Register

    DP 如何知道 DM 的 Unix socket 地址?

    这个地址是常量,不可改变

    // gpushare-device-plugin/pkg/gpu/nvidia/server.go
    import (
    	pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1")
    
    err = m.Register(pluginapi.KubeletSocket, resourceName)
    

    pluginapi.KubeletSocket 是常量 kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1/constants.go

    package v1beta1
    
    const (
    	// Healthy means that the device is healthy
    	Healthy = "Healthy"
    	// UnHealthy means that the device is unhealthy
    	Unhealthy = "Unhealthy"
    
    	// Current version of the API supported by kubelet
    	Version = "v1beta1"
    	// DevicePluginPath is the folder the Device Plugin is expecting sockets to be on
    	// Only privileged pods have access to this path
    	// Note: Placeholder until we find a "standard path"
    	DevicePluginPath = "/var/lib/kubelet/device-plugins/"
    	// KubeletSocket is the path of the Kubelet registry socket
    	KubeletSocket = DevicePluginPath + "kubelet.sock"
    	// Timeout duration in secs for PreStartContainer RPC
    	KubeletPreStartContainerRPCTimeoutInSecs = 30
    )
    
    var SupportedVersions = [...]string{"v1beta1"}
    

    resourceName 是常量 gpushare-device-plugin/pkg/gpu/nvidia/const.go

    const (
    	resourceName  = "aliyun.com/gpu-mem")
    

    Register 的细节

    参数包括:DM 与 DP 通信的版本号DP 的 Unix socket 名字资源的类型名

    1. 通过 kubelet Unix socket 创建 grpc 连接
    conn, err := dial(kubeletEndpoint, 5*time.Second)
    client := pluginapi.NewRegistrationClient(conn)
    
    1. 构造参数:
    // 其中三者都是常量:
    // Endpoint = aliyungpushare.sock
    // pluginapi.Version = v1beta1
    // resourceName = aliyun.com/gpu-mem
    
    reqt := &pluginapi.RegisterRequest{
    	Version:      pluginapi.Version,
    	Endpoint:     path.Base(m.socket),
    	ResourceName: resourceName,
    }
    
    1. 调用 DM 的 rpc register
    _, err = client.Register(context.Background(), reqt)
    
    1. DM 的处理

    检查参数是否合法,创建 endpointImpl 来管理。

    kubernetes/pkg/kubelet/cm/devicemanager/manager.go

    go m.addEndpoint(r)
    

    注意,register rpc 没有返回值,但是第二个参数如果不是 nil 则就是有错误

    // 正确
    return &pluginapi.Empty{}, nil
    
    // 出错
    return &pluginapi.Empty{}, fmt.Errorf(errorString)
    

    详细流程:

    • 执行 m.addEndpoint(r) 增加 endpoint
    • 创建 endpointImpl 主要是,建立与 DP 的 grpc 长连接,填入 callback 函数(genericDeviceUpdateCallback)
    • 在 ManagerImpl 的 m.endpoints (dict 数据结构) 中加入 endpointImpl
    • 执行 runEndpoint,调用一次 ListAndWatch
    • 进入一个死循环:通过 grpc stream 不断的获取返回的状态,用 callback 函数来处理
    • callback 函数,将 DP 上报的所有 device 设备的 id + is_health 填入 m.healthyDevices[resourceName] 和 m.unhealthyDevices[resourceName]

    综上,当 DP 发来注册消息后,DM 组织几个结构:

    1. m.endpoints[resourceName]
    2. 调用 ListAndWatch 后,立刻组织 m.healthyDevices[resourceName]m.unhealthyDevices[resourceName]
    3. m.unhealthyDevices 应该有别的函数会定期处理,一旦有新值,会进入一系列的处理

    DM 调用 DP ListAndWatch 的时机?

    register 时候会调用一次,而且仅仅就这一次,因为 ListAndWatch 是个 GRPC 长连接,DP 可以通过这个长连接不停的反馈。

    ListAndWatch 的参数

    参见说明 kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1/api.proto

    rpc ListAndWatch(Empty) returns (stream ListAndWatchResponse) {}
    
    message ListAndWatchResponse {
    	repeated Device devices = 1;
    }
    
    message Device {
    	// A unique ID assigned by the device plugin used
    	// to identify devices during the communication
    	// Max length of this field is 63 characters
    	string ID = 1;
    	// Health of the device, can be healthy or unhealthy, see constants.go
    	string health = 2;
    }
    

    由 proto 文件中所示,DM 查询无需参数,回复的内容是 repeated deviceID and is_health

    注意 gpushare-device-plugin 返回的是 faked device ID,比如这个机器上插了 8 GPU 16GB 内存。

    我想大家和我一样,认为是 8 个 GPU 设备,其实这个理解是错误的,其实 gpushare-device-plugin 会上报 8*16 个设备,每个设备会有假的 deviceID

    Allocate 详解 in kubelet -> DM

    这是一个很庞杂的问题,简单来说,kubelet 初始化时,有一个步骤是初始化 DM,DM 本身提供了 Allocate 函数供 kubelet 调用。而 DM 中,又会调用 DP 的 rpc Allocate 方法。

    kubelet 什么时候调用 DM 的 Allocate

    我们根据 kubernetes/pkg/kubelet/cm/devicemanager/types.go 中可知,kubelet 肯定通过某种方式调用了下面的函数;

    type Manager interface {
    	// Allocate configures and assigns devices to pods. The pods are provided
    	// through the pod admission attributes in the attrs argument. From the
    	// requested device resources, Allocate will communicate with the owning
    	// device plugin to allow setup procedures to take place, and for the
    	// device plugin to provide runtime settings to use the device (environment
    	// variables, mount points and device files). The node object is provided
    	// for the device manager to update the node capacity to reflect the
    	// currently available devices.
    	Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error
    
    	...
    }
    

    通过一系列分析,确实如同我们的猜测,只不过略显复杂:

    1. kubelet 启动后,klet.admitHandlers 调用 AddPodAdmitHandler 将一系列 PodAdmitHandler 加入 kubelet 的核心结构 klet.adminHandler
    2. 其中有一个叫 lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources) 也被加入;
    // 注意,PredicateAdmitHandler 实现了 Admit 方法,因此,PredicateAdmitHandler 的实例的指针就可以被 AddPodAdmitHandler 直接加入
    type PodAdmitHandler interface {
    // Admit evaluates if a pod can be admitted.
    Admit(attrs *PodAdmitAttributes) PodAdmitResult
    }
    
    1. 在 kubelet syncLoop 中的 syncLoopIteration 中,当出现新的 pod 申请时,会调用 handler.HandlePodUpdates(u.Pods)
    func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
    	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
    	select {
    	case u, open := <-configCh:
    		// Update from a config source; dispatch it to the right handler
    		// callback.
    		if !open {
    			klog.Errorf("Update channel is closed. Exiting the sync loop.")
    			return false
    		}
    
    		switch u.Op {
    		case kubetypes.ADD:
    			klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
    			// After restarting, kubelet will get all existing pods through
    			// ADD as if they are new pods. These pods will then go through the
    			// admission process and *may* be rejected. This can be resolved
    			// once we have checkpointing.
    			// 看过源码,这里的 handler 就是 kl
    			handler.HandlePodAdditions(u.Pods)
    
    1. HandlePodAdditions 中调用 kl.canAdmitPod(activePods, pod)

    2. canAdmitPod 中:

    // 会调用 podAdmitHandler.Admit,因为 podAdmitHandler 是 interface,因此根据实际的类,调用不同的方法
    attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods}
    for _, podAdmitHandler := range kl.admitHandlers {
    	if result := podAdmitHandler.Admit(attrs); !result.Admit {
    		return false, result.Reason, result.Message
    	}
    }
    
    1. 对于 PredicateAdmitHandler.Admit,会调用 err = w.pluginResourceUpdateFunc(nodeInfo, attrs), pluginResourceUpdateFunc 是之前注册的 klet.containerManager.UpdatePluginResources

    2. klet.containerManager.UpdatePluginResources 最终调用:

    // 终于调用到了 DM 的 Allocate 方法
    func (cm *containerManagerImpl) UpdatePluginResources(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
    	return cm.deviceManager.Allocate(node, attrs)
    }
    

    上文的行文逻辑是从 pod 创建后,是如果最终调用到 Allocate 函数的流程,但是探索的过程往往是相反的,是从 Allocate 开始,看谁调用的它。

    这里最复杂的地方在于,UpdatePluginResources 是谁调用的,通过全局搜索,发现只有其被注册的地方,没有其被使用的地方。我们再来整理下思路:

    • UpdatePluginResources 被注册到了 PredicateAdmitHandlerPredicateAdmitHandler 又被加入到 klet.admitHandlers
    • 当有 pod 创建的事件发生时,会遍历 klet.admitHandlers 逐一调用 podAdmitHandler.Admit 方法
    • podAdmitHandler.Admit 中(实际上是 PredicateAdmitHandler.Admin)会调用 pluginResourceUpdateFunc 方法,这个方法正好是注册的 UpdatePluginResources

    关于 kubelet 启动后有哪几个大 loop,又有哪些小 loop,我们可以以后再分析,至少,创建 pod 时确实会调用 Allocate

    DM 的 Allocate 详解

    Allocate 代码 in kubernetes/pkg/kubelet/cm/devicemanager/manager.go

    func (m *ManagerImpl) Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
    	pod := attrs.Pod
    	err := m.allocatePodResources(pod)
    	if err != nil {
    		klog.Errorf("Failed to allocate device plugin resource for pod %s: %v", string(pod.UID), err)
    		return err
    	}
    
    	m.mutex.Lock()
    	defer m.mutex.Unlock()
    
    	// quick return if no pluginResources requested
    	if _, podRequireDevicePluginResource := m.podDevices[string(pod.UID)]; !podRequireDevicePluginResource {
    		return nil
    	}
    
    	m.sanitizeNodeAllocatable(node)
    	return nil
    }
    

    看完了 Allocate 函数和参数,感觉有三座大山:

    • err := m.allocatePodResources(pod)
      • m.allocDevices 填入预申请的 device
      • 请求 DP 的 allocate(rpc)方法将返回的信息填入 m.podDevices
    • m.podDevices
      • 保存 pod 使用 device 的记录结构,在 DM 中被管理
    • m.sanitizeNodeAllocatable(node)
      • 保存 newAllocatableResource 到 node 结构体中

    总结,Allocate 并不返回什么信息,仅仅是做好预分配,将 DM 结构体中的数据结构进行变更。注意,这其中会调用 DP 的 Allocate 方法,但是返回的内容也仅仅先填写到 m.podDevice,再啰嗦一句,该函数就是预分配资源。具体怎么用,什么时候用,还需要更进一步的分析。

    下面是对 Allocate 详细分析:

    m.allocatePodResources(pod) in kubernetes/pkg/kubelet/cm/devicemanager/manager.go

    func (m *ManagerImpl) allocatePodResources(pod *v1.Pod) error {
    	// 注意,devicesToReuse 一开始为空,什么内容都没有
    	devicesToReuse := make(map[string]sets.String)
    	// pod.Spec.InitContainers 参见:https://github.com/kubernetes/api/blob/master/core/v1/types.go
    	// InitContainers 是一个 pod 的最初的 container,锁定网络
    	for _, container := range pod.Spec.InitContainers {
    		// 将 `m.allocDevices` 填入预申请的 device
    		// 请求 DP 的 allocate(rpc)方法将返回的信息填入 `m.podDevices`
    		// 本质上说,到这一步,只改变了 DM 的两个数据结构的值而已,什么都没有变化
    		if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil {
    			return err
    		}
    		// in kubernetes/pkg/kubelet/cm/devicemanager/pod_devices.go
    		// 将 `m.podDevices` 中的 pod + container 的 resource 内容赋值到 `devicesToReuse`,注意,这个操作在 for 循环中,因此 devicesToReuse 会产生影响
    		m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
    	}
    	for _, container := range pod.Spec.Containers {
    		// 同上,是针对真正的 pod 中的 container
    		if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil {
    			return err
    		}
    		m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
    	}
    	return nil
    }
    

    allocateContainerResources in kubernetes/pkg/kubelet/cm/devicemanager/manager.go

    func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container, devicesToReuse map[string]sets.String) error {
    	// 分配的资源在 allocDevices,且已经写入了 m.allocatedDevices
    	allocDevices, err := m.devicesToAllocate(podUID, contName, resource, needed, devicesToReuse[resource])
    	// 取出 resouce(resourceName 为 aliyun.com/gpu-mem)的 GRPC server 地址
    	eI, ok := m.endpoints[resource]
    	// 调用 DP 的 allocate
    	resp, err := eI.e.allocate(devs)
    	// 将结果写入 m.podDevices
    	m.podDevices.insert(podUID, contName, resource, allocDevices, resp.ContainerResponses[0])
    

    devicesToAllocate,返回是需要被分配的 device。

    • m.podDevices.containerDevices 是每一个 container 使用的资源,这个和下面三个 device 互不影响
    • m.healthyDevices 是 DM 保存的健康的 device
    • m.unhealthyDevices 是 DM 保存的不健康的 device
    • m.allocatedDevices 是 DM 保存的已经分配了的资源
    func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, required int, reusableDevices sets.String) (sets.String, error) {
        // containerDevices in `kubernetes/pkg/kubelet/cm/devicemanager/pod_devices.go`
    	// 该函数把需要分配的 device list 返回出去,
    	// 首先尝试使用 reusableDevices,如果足够了,就返回,否则还会额外的返回全新的 device
    	// 如果没有必要返回 device 列表,则返回 nil
    	devices := m.podDevices.containerDevices(podUID, contName, resource)
    	// reusableDevices
    	// Allocates from reusableDevices list first.
    	for device := range reusableDevices {
    		devices.Insert(device)
    		needed--
    		if needed == 0 {
    			return devices, nil
    		}
    	}
    
    	for _, device := range allocated {
    		m.allocatedDevices[resource].Insert(device)
    		//
    		devices.Insert(device)
    	}
    	return devices, nil
    

    sanitizeNodeAllocatable in kubernetes/pkg/kubelet/cm/devicemanager/manager.go

    func (m *ManagerImpl) sanitizeNodeAllocatable(node *schedulernodeinfo.NodeInfo) {
    	var newAllocatableResource *schedulernodeinfo.Resource
    	allocatableResource := node.AllocatableResource()
    	if allocatableResource.ScalarResources == nil {
    		allocatableResource.ScalarResources = make(map[v1.ResourceName]int64)
    	}
    	for resource, devices := range m.allocatedDevices {
    		needed := devices.Len()
    		quant, ok := allocatableResource.ScalarResources[v1.ResourceName(resource)]
    		if ok && int(quant) >= needed {
    			continue
    		}
    		// Needs to update nodeInfo.AllocatableResource to make sure
    		// NodeInfo.allocatableResource at least equal to the capacity already allocated.
    		if newAllocatableResource == nil {
    			newAllocatableResource = allocatableResource.Clone()
    		}
    		newAllocatableResource.ScalarResources[v1.ResourceName(resource)] = int64(needed)
    	}
    	// 将 allocatedDevices 写入 node 结构中,即:n.allocatableResource = allocatableResource
    	if newAllocatableResource != nil {
    		node.SetAllocatableResource(newAllocatableResource)
    	}
    }
    

    Allocate 详解 in DM -> DP

    参数部分相对容易,只要查看 kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1/api.proto 即可

    rpc Allocate(AllocateRequest) returns (AllocateResponse) {}
    
    // 传入的结构体 AllocateRequest,简言之就是一堆 deviceID
    message AllocateRequest {
    	repeated ContainerAllocateRequest container_requests = 1;
    }
    
    message ContainerAllocateRequest {
    	repeated string devicesIDs = 1;
    }
    
    // 返回的结构体 AllocateResponse,简言之,是传入容器的环境变量、文件夹映射、设备映射,和 annotations,其中 mount 和 device 均是 container_path、host_path、permission。
    
    message AllocateResponse {
    	repeated ContainerAllocateResponse container_responses = 1;
    }
    
    message ContainerAllocateResponse {
      	// List of environment variable to be set in the container to access one of more devices.
    	map<string, string> envs = 1;
    	// Mounts for the container.
    	repeated Mount mounts = 2;
    	// Devices for the container.
    	repeated DeviceSpec devices = 3;
    	// Container annotations to pass to the container runtime
    	map<string, string> annotations = 4;
    }
    
    message Mount {
    	// Path of the mount within the container.
    	string container_path = 1;
    	// Path of the mount on the host.
    	string host_path = 2;
    	// If set, the mount is read-only.
    	bool read_only = 3;
    }
    
    // DeviceSpec specifies a host device to mount into a container.
    message DeviceSpec {
        // Path of the device within the container.
        string container_path = 1;
        // Path of the device on the host.
        string host_path = 2;
        // Cgroups permissions of the device, candidates are one or more of
        // * r - allows container to read from the specified device.
        // * w - allows container to write to the specified device.
        // * m - allows container to create device files that do not yet exist.
        string permissions = 3;
    }
    
    

    有一个核心的问题,DM 通过 DP 的 ListAndWatch 接口查到的是几个 device?如果这个机器上插了 8 GPU 16GB 内存。

    我想大家和我一样,认为是 8 个 GPU 设备,其实这个理解是错误的,其实 gpushare-device-plugin 会上报 8*16 个设备,每个设备会有假的 deviceID:

    // gpushare-device-plugin/pkg/gpu/nvidia/nvidia.go func getDevices()
    fakeID := generateFakeDeviceID(d.UUID, j)
    devs = append(devs, &pluginapi.Device{
    				ID:     fakeID,
    				Health: pluginapi.Healthy,
    }
    

    明确这个问题后,可以再看看 DP 以 gpushare-device-plugin 为例是怎么处理的:

    // Allocate which return list of devices.
    func (m *NvidiaDevicePlugin) Allocate(ctx context.Context,
    	reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
    	// 猜测:`Allocate` 的流程中,分配卡就变得容易,我们可以想象,一定是把多个 faked device ID 转化为一张真正的卡,如果出现了 faked device IDs 不属于同一张卡,那么一定是调度出现了问题!
    	// 实际上,DP 从来不做调度管理,是根据 scheduler 增加 annotation 来知道到底 faked device ID 转化为哪一张真正的卡
    	responses := pluginapi.AllocateResponse{}
    
    	log.Infoln("----Allocating GPU for gpu mem is started----")
    	var (
    		podReqGPU uint
    		found     bool
    		assumePod *v1.Pod
    	)
    
    	// podReqGPU = uint(0)
    	// 注意,Allocate 是 Pod 的请求,不是 Pod 下面的 container 的独立请求,因此,podReqGPU 表示,该 pod 一共需要多少 faked 的卡
    	for _, req := range reqs.ContainerRequests {
    		podReqGPU += uint(len(req.DevicesIDs))
    	}
    	// 请求几个 faked GPU 卡
    	log.Infof("RequestPodGPUs: %d", podReqGPU)
    
    	m.Lock()
    	defer m.Unlock()
    	log.Infoln("checking...")
    	// 需要仔细分析
    	// 通过查询 k8s api,获得所有的处于 pending 状态的,并且需要被该 DP 所在 node 处理的 pod
    	// 确实是 getCandidatePods
    	pods, err := getCandidatePods()
    	if err != nil {
    		log.Infof("invalid allocation requst: Failed to find candidate pods due to %v", err)
    		return buildErrResponse(reqs, podReqGPU), nil
    	}
    
    	if log.V(4) {
    		for _, pod := range pods {
    			log.Infof("Pod %s in ns %s request GPU Memory %d with timestamp %v",
    				pod.Name,
    				pod.Namespace,
    				getGPUMemoryFromPodResource(pod),
    				getAssumeTimeFromPodAnnotation(pod))
    		}
    	}
    
    	for _, pod := range pods {
    		// 看下每一个 pod 需要的内存和本次请求的作比较,如果相同,则假设就是这个 pod 发来的请求(不够严谨?)
    		// 我想,从 DM <-> DP 直接通信的参数本意上来说,DP 不需要知道我是为谁(哪个 pod)服务,但是为什么 gpushare-device-plugin 需要知道呢?
    		if getGPUMemoryFromPodResource(pod) == podReqGPU {
    			log.Infof("Found Assumed GPU shared Pod %s in ns %s with GPU Memory %d",
    				pod.Name,
    				pod.Namespace,
    				podReqGPU)
    			assumePod = pod
    			found = true
    			break
    		}
    	}
    
    	if found {
    		// 查找 Pod 中 annotation 的 `ALIYUN_COM_GPU_MEM_IDX` 内容是什么,比如 0 代表用第 0 块卡
    		// 这个我猜测是在调度的时候被打上的
    		// 否则,分配失败会报错
    		id := getGPUIDFromPodAnnotation(assumePod)
    		if id < 0 {
    			log.Warningf("Failed to get the dev ", assumePod)
    		}
    
    		candidateDevID := ""
    		if id >= 0 {
    			ok := false
    			// 得到设备名称
    			candidateDevID, ok = m.GetDeviceNameByIndex(uint(id))
    			if !ok {
    				log.Warningf("Failed to find the dev for pod %v because it's not able to find dev with index %d",
    					assumePod,
    					id)
    				id = -1
    			}
    		}
    
    		if id < 0 {
    			return buildErrResponse(reqs, podReqGPU), nil
    		}
    
    		// 1. Create container requests
    		// 构造返回的内容
    		for _, req := range reqs.ContainerRequests {
    			// reqGPU 是每一个 container 需要的卡,和 podReqGPU 要区分开
    			reqGPU := uint(len(req.DevicesIDs))
    			// 传入的环境变量为:
    			/*
    			NVIDIA_VISIBLE_DEVICES: 也许是GPU0
    			ALIYUN_COM_GPU_MEM_IDX: 0 (GPU index)
    			ALIYUN_COM_GPU_MEM_POD: 10 (POD 总 GPU memory)
    			ALIYUN_COM_GPU_MEM_CONTAINER: 5 (POD 下面的其中一个 container 的 GPU memory)
    			ALIYUN_COM_GPU_MEM_DEV: 16 (这个卡总共能提供多少资源)
    			*/
    			response := pluginapi.ContainerAllocateResponse{
    				Envs: map[string]string{
    					envNVGPU:               candidateDevID,
    					EnvResourceIndex:       fmt.Sprintf("%d", id),
    					EnvResourceByPod:       fmt.Sprintf("%d", podReqGPU),
    					EnvResourceByContainer: fmt.Sprintf("%d", reqGPU),
    					EnvResourceByDev:       fmt.Sprintf("%d", getGPUMemory()),
    				},
    			}
    			responses.ContainerResponses = append(responses.ContainerResponses, &response)
    		}
    
    		// 2. Update Pod spec
    		// 直接改 pod 中的 annotations,生成新的数据结构
    		// ALIYUN_COM_GPU_MEM_ASSIGNED = true
    		// ALIYUN_COM_GPU_MEM_ASSUME_TIME = 时间戳
    		// 感觉很暴力!
    		newPod := updatePodAnnotations(assumePod)
    		// 使之生效
    		_, err = clientset.CoreV1().Pods(newPod.Namespace).Update(newPod)
    		if err != nil {
    			// 如果设置失败,有重试机制
    			// the object has been modified; please apply your changes to the latest version and try again
    			if err.Error() == OptimisticLockErrorMsg {
    				// retry
    				pod, err := clientset.CoreV1().Pods(assumePod.Namespace).Get(assumePod.Name, metav1.GetOptions{})
    				if err != nil {
    					log.Warningf("Failed due to %v", err)
    					return buildErrResponse(reqs, podReqGPU), nil
    				}
    				newPod = updatePodAnnotations(pod)
    				_, err = clientset.CoreV1().Pods(newPod.Namespace).Update(newPod)
    				if err != nil {
    					log.Warningf("Failed due to %v", err)
    					return buildErrResponse(reqs, podReqGPU), nil
    				}
    			} else {
    				log.Warningf("Failed due to %v", err)
    				return buildErrResponse(reqs, podReqGPU), nil
    			}
    		}
    
    	} else {
    		log.Warningf("invalid allocation requst: request GPU memory %d can't be satisfied.",
    			podReqGPU)
    		// return &responses, fmt.Errorf("invalid allocation requst: request GPU memory %d can't be satisfied", reqGPU)
    		return buildErrResponse(reqs, podReqGPU), nil
    	}
    
    	log.Infof("new allocated GPUs info %v", &responses)
    	log.Infoln("----Allocating GPU for gpu mem is ended----")
    	// // Add this to make sure the container is created at least
    	// currentTime := time.Now()
    
    	// currentTime.Sub(lastAllocateTime)
    
    	// 将 1. Create container requests 中的内容返回了
    	return &responses, nil
    }
    

    我们需要总结一下这个流程:

    1. 猜测本次 Allocate 请求是哪个 Pod 提出的
    2. 通过 API 调用,获得 Pod 的 Spec 配置,将多个 faked device ID 转化为一张真正的 GPU 卡
    3. 组织返回的信息,包括多个(Pod 中有几个 container 就有几套):
    NVIDIA_VISIBLE_DEVICES: 也许是GPU0
    ALIYUN_COM_GPU_MEM_IDX: 0 (GPU index)
    ALIYUN_COM_GPU_MEM_POD: 10 (POD 总 GPU memory)
    ALIYUN_COM_GPU_MEM_CONTAINER: 5 (POD 下面的其中一个 container 的 GPU memory)
    ALIYUN_COM_GPU_MEM_DEV: 16 (这个卡总共能提供多少资源)
    
    1. 直接修改 Pod 中的 annotations
    ALIYUN_COM_GPU_MEM_ASSIGNED = true
    ALIYUN_COM_GPU_MEM_ASSUME_TIME = 时间戳
    

    至于 ALIYUN_COM_GPU_MEM_ASSIGNED 等有什么用,我们会另起文章分析。

    Device 异常,DP 和 DM 如何反应?

    首先,一定是 DP 先发现异常的。以 gpushare-device-plugin 为例,在启动的时候,会增加监控 gpushare-device-plugin/pkg/gpu/nvidia/server.go

    func (m *NvidiaDevicePlugin) Start() error {
    	...
    	go m.healthcheck()
    	...
    	}
    
    func (m *NvidiaDevicePlugin) healthcheck() {
    	ctx, cancel := context.WithCancel(context.Background())
    
    	var xids chan *pluginapi.Device
    	if m.healthCheck {
    		xids = make(chan *pluginapi.Device)
    		// 监控所有的 dev
    		go watchXIDs(ctx, m.devs, xids)
    	}
    
    	for {
    		select {
    		case <-m.stop:
    			cancel()
    			return
    		case dev := <-xids:
    			// 如果异常,则调用 unhealthy 方法,该方法向信号参数 m.health 写 dev 的内容
    			m.unhealthy(dev)
    		}
    	}
    }
    
    func watchXIDs(ctx context.Context, devs []*pluginapi.Device, xids chan<- *pluginapi.Device) {
    	通过 nvidia 库 nvml RegisterEventForDevice,并监听是否有异常出现,有异常则往管道里写错误,一个用不停止的大循环
    

    其次,如果 m.unhealthy 出现了异常设备,如何上报?或者是 DM 定期轮询 ListAndWatch?

    通过 ListAndWatch 代码 gpushare-device-plugin/pkg/gpu/nvidia/server.go 可知,实际上都不是!

    func (m *NvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
    	s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs})
    
    	// 之前未曾注意到这个循环,当 DM 在 DP register 后,会立刻调用该函数,当正常返回后,该函数进入一个死循环
    	// 当 m.health 有内容了,立刻向 DM 发送出错的信息,把所有卡都置为失败,这个确实值得商榷
    	for {
    		select {
    		case <-m.stop:
    			return nil
    		case d := <-m.health:
    			// FIXME: there is no way to recover from the Unhealthy state.
    			d.Health = pluginapi.Unhealthy
    			s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs})
    		}
    	}
    }
    

    最后,DM 真的有一个进程一直在等待 ListAndWatchResponse 么?

    简言之是的,因为 DM 调用 ListAndResponse 后,会从 stream 流中不断的获得返回的内容,没有新的,则会卡在 stream.Recv() 中,当有异常的时候,调用 callback 函数后,会 m.unhealthyDevices[resourceName].Insert(dev.ID)

    什么时候才会真正获取 Allocate 后的环境变量等内容

    我们知道 kubelet 调用 DM 的 Allocate 仅仅是将资源准备好(设置几个 dict 而已)(资源是在 DM 中管理的,DP 不记录资源的使用),但是什么时候 kubelet 才真正让这些资源被用起来呢?

    分析的过程:

    1. 我就知道 func (m *ManagerImpl) GetDeviceRunContainerOptions 这个函数是用来返回 options 的,其在 kubernetes/pkg/kubelet/cm/devicemanager/manager.go
    2. func (cm *containerManagerImpl) GetResources in kubernetes/pkg/kubelet/cm/container_manager_linux.go 调用了,该函数返回所有的 options
    3. func (kl *Kubelet) GenerateRunContainerOptions in kubernetes/pkg/kubelet/kubelet_pods.go 该函数调用了
    4. func (m *kubeGenericRuntimeManager) generateContainerConfig in kubernetes/pkg/kubelet/kuberuntime/kuberuntime_container.go
    5. func (m *kubeGenericRuntimeManager) startContainer in kubernetes/pkg/kubelet/kuberuntime/kuberuntime_container.go
    6. func (m *kubeGenericRuntimeManager) SyncPod in kubernetes/pkg/kubelet/kuberuntime/kuberuntime_manager.go
    7. func (kl *Kubelet) syncPod in kubernetes/pkg/kubelet/kubelet.go
    8. func (m *manager) Start() in kubernetes/pkg/kubelet/status/status_manager.go 当有 m.podStatusChannel 事件时,SyncPod 被触发
    9. func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { in kubernetes/pkg/kubelet/kubelet.go

    Allocate 应该在先,然后才是 GetDeviceRunContainerOptions,分析的过程中并没有明确的先后顺序,作为下次分析的主题

    扩展

    nvidia-docker 能够使用 GPU 的原理

    准备作为一个主题,下次分享 参考

    k8s 启动带 GPU 的容器的原理

    准备作为一个主题,下次分享 参考

  • 相关阅读:
    eas之排序接口
    eas中删除原来的监听事件添加新的监听事件
    eas之获取单据编码规则
    eas更改用户组织范围和业务组织范围
    ECMAScript 6 文档
    java网络编程介绍
    计算机网络-总结(三)
    Lombok包下常用注解
    计算机网络介绍(二)
    计算机网络介绍(一)
  • 原文地址:https://www.cnblogs.com/oolo/p/11672720.html
Copyright © 2011-2022 走看看