zoukankan      html  css  js  c++  java
  • kubernetes源码阅读笔记——Kubelet(之四)

    上一篇文章我们发现,kubelet在运行时会通过五种渠道获得pod状态变化的信息,并层层调用,直到调用到kubelet.go里的syncPod方法。

    一、kubelet.go的syncPod方法

    syncPod方法非常重要,存放了kubelet创建一个pod的基本逻辑。我们来仔细看一下:

    pkg/kubelet/kubelet.go

    func (kl *Kubelet) syncPod(o syncPodOptions) error { // pull out the required options pod := o.pod mirrorPod := o.mirrorPod podStatus := o.podStatus updateType := o.updateType // if we want to kill a pod, do it now! if updateType == kubetypes.SyncPodKill { killPodOptions := o.killPodOptions if killPodOptions == nil || killPodOptions.PodStatusFunc == nil { return fmt.Errorf("kill pod options are required if update type is kill") } apiPodStatus := killPodOptions.PodStatusFunc(pod, podStatus) kl.statusManager.SetPodStatus(pod, apiPodStatus) // we kill the pod with the specified grace period since this is a termination if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err) // there was an error killing the pod, so we return that error directly utilruntime.HandleError(err) return err } return nil } // Latency measurements for the main workflow are relative to the // first time the pod was seen by the API server. var firstSeenTime time.Time if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok { firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get() } // Record pod worker start latency if being created // TODO: make pod workers record their own latencies if updateType == kubetypes.SyncPodCreate { if !firstSeenTime.IsZero() { // This is the first time we are syncing the pod. Record the latency // since kubelet first saw the pod if firstSeenTime is set. metrics.PodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime)) } else { klog.V(3).Infof("First seen time not recorded for pod %q", pod.UID) } } // Generate final API pod status with pod and status manager status apiPodStatus := kl.generateAPIPodStatus(pod, podStatus) // The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576) // TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and // set pod IP to hostIP directly in runtime.GetPodStatus podStatus.IP = apiPodStatus.PodIP // Record the time it takes for the pod to become running. existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID) if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning && !firstSeenTime.IsZero() { metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime)) } runnable := kl.canRunPod(pod) if !runnable.Admit { // Pod is not runnable; update the Pod and Container statuses to why. apiPodStatus.Reason = runnable.Reason apiPodStatus.Message = runnable.Message // Waiting containers are not creating. const waitingReason = "Blocked" for _, cs := range apiPodStatus.InitContainerStatuses { if cs.State.Waiting != nil { cs.State.Waiting.Reason = waitingReason } } for _, cs := range apiPodStatus.ContainerStatuses { if cs.State.Waiting != nil { cs.State.Waiting.Reason = waitingReason } } } // Update status in the status manager kl.statusManager.SetPodStatus(pod, apiPodStatus) // Kill pod if it should not be running if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed { var syncErr error if err := kl.killPod(pod, nil, podStatus, nil); err != nil { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err) syncErr = fmt.Errorf("error killing pod: %v", err) utilruntime.HandleError(syncErr) } else { if !runnable.Admit { // There was no error killing the pod, but the pod cannot be run. // Return an error to signal that the sync loop should back off. syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message) } } return syncErr } // If the network plugin is not ready, only start the pod if it uses the host network if rs := kl.runtimeState.networkErrors(); len(rs) != 0 && !kubecontainer.IsHostNetworkPod(pod) { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, rs) return fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, rs) } // Create Cgroups for the pod and apply resource parameters // to them if cgroups-per-qos flag is enabled. pcm := kl.containerManager.NewPodContainerManager() // If pod has already been terminated then we need not create // or update the pod's cgroup if !kl.podIsTerminated(pod) { // When the kubelet is restarted with the cgroups-per-qos // flag enabled, all the pod's running containers // should be killed intermittently and brought back up // under the qos cgroup hierarchy. // Check if this is the pod's first sync firstSync := true for _, containerStatus := range apiPodStatus.ContainerStatuses { if containerStatus.State.Running != nil { firstSync = false break } } // Don't kill containers in pod if pod's cgroups already // exists or the pod is running for the first time podKilled := false if !pcm.Exists(pod) && !firstSync { if err := kl.killPod(pod, nil, podStatus, nil); err == nil { podKilled = true } } // Create and Update pod's Cgroups // Don't create cgroups for run once pod if it was killed above // The current policy is not to restart the run once pods when // the kubelet is restarted with the new flag as run once pods are // expected to run only once and if the kubelet is restarted then // they are not expected to run again. // We don't create and apply updates to cgroup if its a run once pod and was killed above if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) { if !pcm.Exists(pod) { if err := kl.containerManager.UpdateQOSCgroups(); err != nil { klog.V(2).Infof("Failed to update QoS cgroups while syncing pod: %v", err) } if err := pcm.EnsureExists(pod); err != nil { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err) return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err) } } } } // Create Mirror Pod for Static Pod if it doesn't already exist if kubepod.IsStaticPod(pod) { ... } // Make data directories for the pod if err := kl.makePodDataDirs(pod); err != nil { ... } // Volume manager will not mount volumes for terminated pods if !kl.podIsTerminated(pod) { // Wait for volumes to attach/mount if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to mount volumes for pod %q: %v", format.Pod(pod), err) klog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err) return err } } // Fetch the pull secrets for the pod pullSecrets := kl.getPullSecretsForPod(pod) // Call the container runtime's SyncPod callback result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff) kl.reasonCache.Update(pod.UID, result) if err := result.Error(); err != nil { // Do not return error if the only failures were pods in backoff for _, r := range result.SyncResults { if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff { // Do not record an event here, as we keep all event logging for sync pod failures // local to container runtime so we get better errors return err } } return nil } return nil }

    可以看到,方法是按照下面的步骤更新pod的:

    (1)如果是删除操作,则优先执行。

    (2)记录pod从创建到运行花费的时间。

    (3)判断pod是否能正常运行,如不能则记录原因。

    (4)删除不应该运行的pod。

    (5)判断网络资源是否可用。

    (6)为pod创建cgroup。

    (7)处理静态pod,创建pod的元数据目录。

    (8)通过volumemanager,实现volume的挂载。

    (9)获取拉取镜像的pullsecret。

    (10)调用container runtime的syncPod方法,实现容器创建的主逻辑。

    以上,就是kubelet在更新pod时采取的一系列步骤。下面我们再来看一下container runtime的syncPod方法。

    二、kuberuntime_manager.go的syncPod方法

    pkg/kubelet/kuberumtime/kuberuntime_manager.go

    //
    SyncPod syncs the running pod into the desired pod by executing following steps: // // 1. Compute sandbox and container changes. // 2. Kill pod sandbox if necessary. // 3. Kill any containers that should not be running. // 4. Create sandbox if necessary. // 5. Create init containers. // 6. Create normal containers. func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) { // Step 1: Compute sandbox and container changes. podContainerChanges := m.computePodActions(pod, podStatus) klog.V(3).Infof("computePodActions got %+v for pod %q", podContainerChanges, format.Pod(pod)) if podContainerChanges.CreateSandbox { ... } // Step 2: Kill the pod if the sandbox has changed. if podContainerChanges.KillPod { ... } else { // Step 3: kill any running containers in this pod which are not to keep. for containerID, containerInfo := range podContainerChanges.ContainersToKill { klog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerInfo.name, containerID, format.Pod(pod)) killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name) result.AddSyncResult(killContainerResult) if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil { killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error()) klog.Errorf("killContainer %q(id=%q) for pod %q failed: %v", containerInfo.name, containerID, format.Pod(pod), err) return } } } ...
    // Step 4: Create a sandbox for the pod if necessary. podSandboxID := podContainerChanges.SandboxID if podContainerChanges.CreateSandbox { var msg string var err error klog.V(4).Infof("Creating sandbox for pod %q", format.Pod(pod)) createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod)) result.AddSyncResult(createSandboxResult) podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
    ...
    } ...

      // Step 5: start the init container. if container := podContainerChanges.NextInitContainerToStart; container != nil { // Start the next init container. startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name) result.AddSyncResult(startContainerResult) isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff) if isInBackOff { startContainerResult.Fail(err, msg) klog.V(4).Infof("Backing Off restarting init container %+v in pod %v", container, format.Pod(pod)) return } klog.V(4).Infof("Creating init container %+v in pod %v", container, format.Pod(pod)) if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeInit); err != nil { startContainerResult.Fail(err, msg) utilruntime.HandleError(fmt.Errorf("init container start failed: %v: %s", err, msg)) return } // Successfully started the container; clear the entry in the failure klog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod)) } // Step 6: start containers in podContainerChanges.ContainersToStart. for _, idx := range podContainerChanges.ContainersToStart { container := &pod.Spec.Containers[idx] startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name) result.AddSyncResult(startContainerResult) isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff) if isInBackOff { startContainerResult.Fail(err, msg) klog.V(4).Infof("Backing Off restarting container %+v in pod %v", container, format.Pod(pod)) continue } klog.V(4).Infof("Creating container %+v in pod %v", container, format.Pod(pod)) if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeRegular); err != nil { startContainerResult.Fail(err, msg) // known errors that are logged in other places are logged at higher levels here to avoid // repetitive log spam switch { case err == images.ErrImagePullBackOff: klog.V(3).Infof("container start failed: %v: %s", err, msg) default: utilruntime.HandleError(fmt.Errorf("container start failed: %v: %s", err, msg)) } continue } } return }

    注释写得很直观,首先通过沙盒判断容器状态的变化,做出相应的处理。之后,对于新容器,则依次创建沙盒、启动容器和主容器。前面部分略去,重点看后面启动容器的地方,是调用了kubeGenericRuntimeManager的startContainer方法。

    三、startContainer

    startContainer方法定义了kubelet启动容器的具体流程:

    pkg/kubelet/kuberumtime/kuberuntime_manager.go
    // startContainer starts a container and returns a message indicates why it is failed on error.
    // It starts the container through the following steps:
    // * pull the image
    // * create the container
    // * start the container
    // * run the post start lifecycle hooks (if applicable)
    func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, container *v1.Container, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, containerType kubecontainer.ContainerType) (string, error) {
        // Step 1: pull the image.
        imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig)
        if err != nil {
            m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
            return msg, err
        }
    
        // Step 2: create the container.
        ref, err := kubecontainer.GenerateContainerRef(pod, container)
        if err != nil {
            klog.Errorf("Can't make a ref to pod %q, container %v: %v", format.Pod(pod), container.Name, err)
        }
        klog.V(4).Infof("Generating ref for container %s: %#v", container.Name, ref)
    
        // For a new container, the RestartCount should be 0
        restartCount := 0
        containerStatus := podStatus.FindContainerStatusByName(container.Name)
        if containerStatus != nil {
            restartCount = containerStatus.RestartCount + 1
        }
    
        containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, containerType)
        if cleanupAction != nil {
            defer cleanupAction()
        }
        if err != nil {
            m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
            return grpc.ErrorDesc(err), ErrCreateContainerConfig
        }
    
        containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
        if err != nil {
            m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
            return grpc.ErrorDesc(err), ErrCreateContainer
        }
        err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
        if err != nil {
            m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Internal PreStartContainer hook failed: %v", grpc.ErrorDesc(err))
            return grpc.ErrorDesc(err), ErrPreStartHook
        }
        m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.CreatedContainer, "Created container")
    
        if ref != nil {
            m.containerRefManager.SetRef(kubecontainer.ContainerID{
                Type: m.runtimeName,
                ID:   containerID,
            }, ref)
        }
    
        // Step 3: start the container.
        err = m.runtimeService.StartContainer(containerID)
        if err != nil {
            m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToStartContainer, "Error: %v", grpc.ErrorDesc(err))
            return grpc.ErrorDesc(err), kubecontainer.ErrRunContainer
        }
        m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.StartedContainer, "Started container")
    
        // Symlink container logs to the legacy container log location for cluster logging
        // support.
        // TODO(random-liu): Remove this after cluster logging supports CRI container log path.
        containerMeta := containerConfig.GetMetadata()
        sandboxMeta := podSandboxConfig.GetMetadata()
        legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,
            sandboxMeta.Namespace)
        containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath)
        // only create legacy symlink if containerLog path exists (or the error is not IsNotExist).
        // Because if containerLog path does not exist, only dandling legacySymlink is created.
        // This dangling legacySymlink is later removed by container gc, so it does not make sense
        // to create it in the first place. it happens when journald logging driver is used with docker.
        if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) {
            if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {
                klog.Errorf("Failed to create legacy symbolic link %q to container %q log %q: %v",
                    legacySymlink, containerID, containerLog, err)
            }
        }
    
        // Step 4: execute the post start hook.
        if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
            kubeContainerID := kubecontainer.ContainerID{
                Type: m.runtimeName,
                ID:   containerID,
            }
            msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
            if handlerErr != nil {
                m.recordContainerEvent(pod, container, kubeContainerID.ID, v1.EventTypeWarning, events.FailedPostStartHook, msg)
                if err := m.killContainer(pod, kubeContainerID, container.Name, "FailedPostStartHook", nil); err != nil {
                    klog.Errorf("Failed to kill container %q(id=%q) in pod %q: %v, %v",
                        container.Name, kubeContainerID.String(), format.Pod(pod), ErrPostStartHook, err)
                }
                return msg, fmt.Errorf("%s: %v", ErrPostStartHook, handlerErr)
            }
        }
    
        return "", nil
    }

    如注释所写,大体上是4步,即:拉取镜像、创建容器、启动容器、执行钩子。另外还有一些日志处理等细节操作。

    值得关注的是,方法在创建或杀死容器时,调用了m.runtimeService的CreateContainer、StartContainer等方法。

    我们以CreateContainer为例,进入这一方法,发现其调用了pkg/kubelet/remote/remote_runtime.go文件中的CreateContainer方法,而这一方法本质上又调用了另一个文件中的CreateContainer方法:

    pkg/kubelet/apis/cri/runtime/v1alpha2/api.pb.go

    func (c *runtimeServiceClient) CreateContainer(ctx context.Context, in *CreateContainerRequest, opts ...grpc.CallOption) (*CreateContainerResponse, error) { out := new(CreateContainerResponse) err := grpc.Invoke(ctx, "/runtime.v1alpha2.RuntimeService/CreateContainer", in, out, c.cc, opts...) if err != nil { return nil, err } return out, nil }

    然后我们看到,这个CreateContainer方法通过grpc的方式调用RuntimeService的CreateContainer方法。这恰恰是我们在第二篇文章中提到的kubelet 的grpc server(https://www.cnblogs.com/00986014w/p/10895532.html)。至此,kubelet中方法的定义和调用就形成了一个闭环。

    四、总结

    kubelet的代码虽然多,但是逻辑比较清晰。本质上就是先创建一个kubelet实例,启动grpc server,然后运行kubelet,通过五种途径获取pod的状态变化,执行pod更新操作,并最终通过grpc的方式调用grpc server上的方法。

  • 相关阅读:
    1093 Count PAT's(25 分)
    1089 Insert or Merge(25 分)
    1088 Rational Arithmetic(20 分)
    1081 Rational Sum(20 分)
    1069 The Black Hole of Numbers(20 分)
    1059 Prime Factors(25 分)
    1050 String Subtraction (20)
    根据生日计算员工年龄
    动态获取当前日期和时间
    对计数结果进行4舍5入
  • 原文地址:https://www.cnblogs.com/00986014w/p/10910837.html
Copyright © 2011-2022 走看看