zoukankan      html  css  js  c++  java
  • kubelet源码分析——启动Pod

    前文说到Kubelet启动时,调用到kubelet.Run方法,里面最核心的就是调用到kubelet.syncLoop。它是一个循环,这个循环里面有若干个检查和同步操作,其中一个是地在监听Pod的增删改事件,当一个Pod被Scheduler调度到某个Node之后,就会触发到kubelet.syncLoop里面的事件,经过一系列的操作,最后达到Pod正常跑起来。

    kubelet.syncLoop

    kubelet.syncLoop	/pkg/kubelet/kubelet.go
    |--kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh)
    	|--u, open := <-configCh
    	|--handler.HandlePodAdditions(u.Pods)即Kubelet.HandlePodAdditions
    		|--sort.Sort(sliceutils.PodsByCreationTime(pods))	
    		|--kl.handleMirrorPod(pod, start)
    			|--kl.dispatchWork
    		|--kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
    			|--kl.podWorkers.UpdatePod即podWorkers.UpdatePod	/pkg/kubelet/pod_worker.go
    				|--p.managePodLoop
    					|--p.syncPodFn
    

    syncLoop

    即使没有需要更新的 pod 配置,kubelet 也会定时去做同步和清理 pod 的工作。然后在 for 循环中一直调用 syncLoopIteration,如果在每次循环过程中出现比较严重的错误,kubelet 会记录到 runtimeState 中,遇到错误就等待 5 秒中继续循环。

    func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
        // syncTicker 每秒检测一次是否有需要同步的 pod workers
        syncTicker := time.NewTicker(time.Second)
        defer syncTicker.Stop()
        // 每两秒检测一次是否有需要清理的 pod
        housekeepingTicker := time.NewTicker(housekeepingPeriod)
        defer housekeepingTicker.Stop()
        // pod 的生命周期变化
        plegCh := kl.pleg.Watch()
    	...
    	for {
    		if err := kl.runtimeState.runtimeErrors(); err != nil {
    			klog.Errorf("skipping pod synchronization - %v", err)
    			// exponential backoff
    			time.Sleep(duration)
    			duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
    			continue
    		}
    		// reset backoff if we have a success
    		duration = base
    		...
    		if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
    			break
    		}
    		...
    	}
    	...
    }
    

    syncLoopIteration

    syncLoopIteration 这个方法就会对多个管道进行遍历,发现任何一个管道有消息就交给 handler 去处理。对于pod创建相关的就是configCh,它会传递来自3个来源(file,http,apiserver)的pod的变化(增,删,改)。其他相关管道还有每1秒同步一次pod的syncCh,每1秒检查一下是否需要清理pod的housekeepingCh 等等。

    func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
    	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
    	select {
    	case u, open := <-configCh: //三个来源的更新事件
    		....
    		switch u.Op {
    		case kubetypes.ADD:
    			klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
    			// After restarting, kubelet will get all existing pods through
    			// ADD as if they are new pods. These pods will then go through the
    			// admission process and *may* be rejected. This can be resolved
    			// once we have checkpointing.
    			handler.HandlePodAdditions(u.Pods)
    		.....
    		}
    	case <-syncCh:   //定时器1秒一次,说是sync
    	....
    	case update := <-kl.livenessManager.Updates():	///存活检查
    	....
    	case <-housekeepingCh:  //定时器2秒一次,清理的 pod
    }
    

    HandlePodAddtions 处理pod的新增事件

    func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
    	sort.Sort(sliceutils.PodsByCreationTime(pods))	//将pods按照创建日期排列,保证最先创建的 pod 会最先被处理
    	for _, pod := range pods {
    
            	// 把 pod 加入到 podManager 中。statusManager,volumeManager,runtimeManager都依赖于这个podManager
    	        kl.podManager.AddPod(pod)
    
    		//处理静态pod,实际上内部同样是调用了kl.dispatchWork,这里主要跳过了拒绝掉pod的判断
    		if kubetypes.IsMirrorPod(pod) {
    			kl.handleMirrorPod(pod, start)
    			continue
    		}
    		
    		if !kl.podIsTerminated(pod) {
    			// Only go through the admission process if the pod is not
    			// terminated.
    
    			// We failed pods that we rejected, so activePods include all admitted
    			// pods that are alive.
    			activePods := kl.filterOutTerminatedPods(existingPods)
    
    			////验证 pod 是否能在该节点运行,如果不可以直接拒绝;
    			// Check if we can admit the pod; if not, reject it.
    			if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
    				kl.rejectPod(pod, reason, message)
    				continue
    			}
    		}
    		....
    		kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
    		.....
    	}
    }
    

    UpdatePod

    此处调用managePodLoop通过一个协程去执行,通过一个podUpdates的map标记是否有创建过协程,然后通过working这个map标记是否有运行,没有运行的往通道里面传递,让managePodLoop得以执行

    func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
    	var podUpdates chan UpdatePodOptions
    	if podUpdates, exists = p.podUpdates[uid]; !exists {
    		p.podUpdates[uid] = podUpdates
    		go func() {
    			defer runtime.HandleCrash()
    			p.managePodLoop(podUpdates)
    		}()
    	}
    	if !p.isWorking[pod.UID] {
    		p.isWorking[pod.UID] = true
    		podUpdates <- *options
    	} else {
    		...
    	}
    	....
    }
    

    managePodLoop

    到达syncPodFn方法调用,他是podWorkers的一个字段,在构造podWorkers的时候传入,实际就是kubelet.syncPod方法

    func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
    	...
    			err = p.syncPodFn(syncPodOptions{
    				mirrorPod:      update.MirrorPod,
    				pod:            update.Pod,
    				podStatus:      status,
    				killPodOptions: update.KillPodOptions,
    				updateType:     update.UpdateType,
    			})
    	...
    }
    

    Pod sync(Kubelet.syncPod)

    1 如果是 pod 创建事件,会记录一些 pod latency 相关的 metrics;
    2 生成一个 v1.PodStatus 对象,Pod的状态包括这些 Pending Running Succeeded Failed Unknown
    3 PodStatus 生成之后,将发送给 Pod status manager
    4 运行一系列 admission handlers,确保 pod 有正确的安全权限
    5 kubelet 将为这个 pod 创建 cgroups。
    6 创建容器目录 /var/run/kubelet/pods/podid volume $poddir/volumes plugins $poddir/plugins
    7 volume manager 将 等待volumes attach 完成
    8 从 apiserver 获取 Spec.ImagePullSecrets 中指定的 secrets,注入容器
    9 容器运行时(runtime)创建容器
    由于代码篇幅较长,这里就只粘出关键的方法或函数调用,代码位于/pkg/kubelet/kubelet.go

    func (kl *Kubelet) syncPod(o syncPodOptions) error {
    	//1. 如果是 pod 创建事件,会记录一些 pod latency 相关的 metrics
    	// Record pod worker start latency if being created
    	// TODO: make pod workers record their own latencies
    	if updateType == kubetypes.SyncPodCreate {
    		if !firstSeenTime.IsZero() {
    			// This is the first time we are syncing the pod. Record the latency
    			// since kubelet first saw the pod if firstSeenTime is set.
    			metrics.PodWorkerStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
    		} else {
    			klog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)
    		}
    	}
    
    	//2. 生成一个 v1.PodStatus 对象
    	apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
    	//3.1. 生成PodStatus
    	apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
    	//4. 运行一系列 admission handlers,确保 pod 有正确的安全权限
    	runnable := kl.canRunPod(pod)	
    	....
    	//3.2. PodStatus 生成之后,将发送给 Pod status manager
    	kl.statusManager.SetPodStatus(pod, apiPodStatus)
    	//5. kubelet 将为这个 pod 创建 cgroups
    	if !kl.podIsTerminated(pod) {
    		if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
    			if !pcm.Exists(pod) {
    				if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
    					klog.V(2).Infof("Failed to update QoS cgroups while syncing pod: %v", err)
    				}
    				if err := pcm.EnsureExists(pod); err != nil {
    					kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err)
    					return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
    				}
    			}
    		}
    	}
    
    	//6 创建容器目录
    	// Make data directories for the pod
    	if err := kl.makePodDataDirs(pod); err != nil {
    		kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
    		klog.Errorf("Unable to make pod data directories for pod %q: %v", format.Pod(pod), err)
    		return err
    	}
    
    	// Volume manager will not mount volumes for terminated pods
    	if !kl.podIsTerminated(pod) {
    		//7 volume manager 将 等待volumes attach 完成
    		//等待挂载,但是挂载不在这里执行
    		// Wait for volumes to attach/mount
    		if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
    			kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
    			klog.Errorf("Unable to attach or mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err)
    			return err
    		}
    	}
    
    	//8 从 apiserver 获取 Spec.ImagePullSecrets 中指定的 secrets,注入容器
    	//部分pod会有ImagePullSecrets,用于登录镜像库拉镜像
    	// Fetch the pull secrets for the pod
    	pullSecrets := kl.getPullSecretsForPod(pod)
    
    	//9 容器运行时(runtime)创建容器
    	// Call the container runtime's SyncPod callback
    	result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
    
    }
    

    运行时创建容器(kubeGenericRuntimeManager.SyncPod)

    1 计算sandbox和container变化
    2 如果sandbox变更了就要把pod kill了
    3 kill掉pod中没有运行的container
    4 要创建sandbox的就创建
    5 创建临时容器
    6 创建init容器
    7 创建业务容器
    代码位于/pkg/kubelet/kuberuntime/kuberuntime_manager.go

    func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
    	// Step 1: Compute sandbox and container changes.
    	podContainerChanges := m.computePodActions(pod, podStatus)
    	// Step 2: Kill the pod if the sandbox has changed.
    	if podContainerChanges.KillPod {
    		killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
    	} else {
    		// Step 3: kill any running containers in this pod which are not to keep.
    		for containerID, containerInfo := range podContainerChanges.ContainersToKill {
    			if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil {
    			}
    		}
    	}
    	// Step 4: Create a sandbox for the pod if necessary.
    	podSandboxID := podContainerChanges.SandboxID
    	if podContainerChanges.CreateSandbox {
    		podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
    	}
    	// Step 5: start ephemeral containers
    	if utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) {
    		for _, idx := range podContainerChanges.EphemeralContainersToStart {
    			start("ephemeral container", ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
    		}
    	}
    	// Step 6: start the init container.
    	if container := podContainerChanges.NextInitContainerToStart; container != nil {
    		// Start the next init container.
    		if err := start("init container", containerStartSpec(container)); err != nil {
    			return
    		}
    	}
    
    	// Step 7: start containers in podContainerChanges.ContainersToStart.
    	for _, idx := range podContainerChanges.ContainersToStart {
    		start("container", containerStartSpec(&pod.Spec.Containers[idx]))
    	}
    	return
    }
    
    创建sandbox

    1 拉sandbox镜像
    2 创建sandbox 容器
    3 创建sandbox的checkpoint
    4 启动sandbox容器,如果失败交由kubelet GC
    5 hostNetwork就可以返回,否则让CNI编织网络
    这个过程会涉及到几层的调用链,才会找到最终创建sandbox的代码,从kubeGenericRuntimeManager.SyncPod起

    m.createPodSandbox	/pkg/kubelet/kuberuntime/kuberuntime_manager.go
    |--m.runtimeService.RunPodSandbox	/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go
    	|--r.runtimeClient.RunPodSandbox	runtimeService.RunPodSandbox的实现类是remoteRuntimeService	/pkg/kubelet/cri/remote/remote_runtime.go
    		|--dockerService.RunPodSandbox	/pkg/kubelet/dockershim/docker_sandbox/go
    

    dockerService.RunPodSandbox方法的简略如下

    func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) {
    	// Step 1: Pull the image for the sandbox.
    	if err := ensureSandboxImageExists(ds.client, image); err != nil {
    		return nil, err
    	}
    	// Step 2: Create the sandbox container.
    	createConfig, err := ds.makeSandboxDockerConfig(config, image)
    	createResp, err := ds.client.CreateContainer(*createConfig)
    	// Step 3: Create Sandbox Checkpoint.
    	if err = ds.checkpointManager.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil {
    		return nil, err
    	}
    	// Step 4: Start the sandbox container.
    	err = ds.client.StartContainer(createResp.ID)
    	// Step 5: Setup networking for the sandbox.
    	if config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtimeapi.NamespaceMode_NODE {
    		return resp, nil
    	}
    	err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID, config.Annotations, networkOptions)
    }
    
    CNI编织网路

    kubelet使用 /etc/cni/net.d的配置文件启动 /opt/cni/bin 二进制的CNI 插件
    CNI 插件创建veth peer,分配ip,设置ip等

    创建临时容器、 init 容器及业务容器

    1 拉镜像
    2 创建容器
    3 启动容器
    4 执行post start hook
    三种容器都是调用了kubeGenericRuntimeManager.SyncPod内定义的局部函数,只是因为容器类型不一样而入参不一样而已

    在局部函数调用kubeGenericRuntimeManager.startContainer方法简略如下,代码路径/pkg/kubelet/kuberuntime/kuberuntime_container.go

    func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {
    	// Step 1: pull the image.
    	imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig)
    	// Step 2: create the container.
    	containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
    	// Step 3: start the container.
    	err = m.runtimeService.StartContainer(containerID)
    	// Step 4: execute the post start hook.
    	if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
    		msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
    	}
    }
    

    小结

    本篇从kubelet的主循环开始,讲述了pod的启动过程,包括状态更新,分配cgroup,创建容器目录,等待volume挂载,注入imagepull secret,创建sandbox,调用cni编织网络,启动临时容器,init容器,业务容器,执行postStart生命周期钩子。

    如有兴趣,可阅读鄙人“k8s源码之旅”系列的其他文章
    kubelet源码分析——kubelet简介与启动
    kubelet源码分析——启动Pod
    kubelet源码分析——关闭Pod
    kubelet源码分析——监控Pod变更
    scheduler源码分析——调度流程
    apiserver源码分析——启动流程
    apiserver源码分析——处理请求

    参考文章

    万字长文:K8s 创建 pod 时,背后到底发生了什么?
    kubelet 创建 pod 的流程
    Pod 的创建
    kubernetes/k8s CRI分析-kubelet创建pod分析

  • 相关阅读:
    洛谷P4172 [WC2006]水管局长(lct求动态最小生成树)
    洛谷P1501 [国家集训队]Tree II(打标记lct)
    洛谷P2173 [ZJOI2012]网络(10棵lct与瞎jb暴力)
    [Asp.net 5] Localization-resx资源文件的管理
    [Asp.net 5] Localization-简单易用的本地化-全球化信息
    [Asp.net 5] Configuration-新一代的配置文件
    [Asp.net 5] Configuration-新一代的配置文件(ConfigurationSource的多种实现)
    [Asp.net 5] Configuration-新一代的配置文件(神奇的Binder)
    [Asp.net 5] Configuration-新一代的配置文件(接口定义与基础实现)
    [Asp.net 5] DependencyInjection项目代码分析-目录
  • 原文地址:https://www.cnblogs.com/HopeGi/p/15351267.html
Copyright © 2011-2022 走看看