Pod 启动流程详解
1. 概述
在 Kubernetes 集群中,每个 Node 节点上都会启动一个 Kubelet 服务进程,该进程用于处理 Master 下发到本节点的 Pod 并管理其生命周期。换句话说,Pod 的创建、删除、更新等操作,都是由 kubelet 进行管理的,它将处理 Pod 与 Container Runtime 之间所有的转换逻辑,包括挂载卷、容器日志、垃圾回收等。
kubelet 可以通过以下几种方式获取本节点上需要管理的 Pod 的信息:
- file:kubelet启动参数“--config”指定的配置文件目录下的文件(默认目录为“/etc/kubernetes/manifests/”)。
- http:通过“--manifest-url”参数设置。
- api server:kubelet通过API Server监听etcd目录,同步Pod列表。
其中,以配置文件方式和http方式创建的Pod叫做静态pod(static pod)。简单介绍如下:
静态Pod是由kubelet进行管理的仅存在于特定Node上的Pod。它们不能通过API Server进行管理,无法与ReplicationController、Deployment 或者DaemonSet进行关联,并且kubelet无法对它们进行健康检查。静态 Pod总是由kubelet创建的,并且总在kubelet所在的Node上运行。
本文所讲的是通过 api server 方式创建的 pod。
一个 pod 的创建要从客户端键入kubectl create -f xxx.yaml
开始,这里假定:
- 客户端发送的HTTP请求已经通过校验(API Server的身份认证、鉴权和准入控制);
- 已经把资源对象信息持久化到 etcd 中;
- master节点的调度器已经把pod调度到合适的节点上;
接下的步骤就涉及到在工作节点上真正创建 pod,这也是本文的重点,这部分由 kubelet 组件负责。这部分内容结合这篇文章一同消化。
2. 源码分析
1. kubelet 循环控制(syncLoop)
syncLoop()函数是同步Pod状态的主函数,是一个永远都不会退出的无限循环函数。
syncLoop 中首先定义了一个 syncTicker 和 housekeepingTicker,即使没有需要更新的 pod 配置,kubelet 也会定时去做同步 pod 和清理 pod 的工作。然后在 for 循环中一直调用 syncLoopIteration,如果在每次循环过程中出现比较严重的错误,kubelet 会记录到 runtimeState 中,遇到错误就等待 5 秒,然后继续循环。
// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
klog.Info("Starting kubelet main sync loop.")
// The syncTicker wakes up kubelet to checks if there are any pod workers
// that need to be sync'd. A one-second period is sufficient because the
// sync interval is defaulted to 10s.
// syncTicker 每1秒检测一次是否有需要同步的 pod workers
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()
// 每2秒检测一次是否有需要清理的 pod
housekeepingTicker := time.NewTicker(housekeepingPeriod)
defer housekeepingTicker.Stop()
// pod 的生命周期变化
plegCh := kl.pleg.Watch()
const (
base = 100 * time.Millisecond
max = 5 * time.Second
factor = 2
)
duration := base
// Responsible for checking limits in resolv.conf
// The limits do not have anything to do with individual pods
// Since this is called in syncLoop, we don't need to call it anywhere else
if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
kl.dnsConfigurer.CheckLimitsForResolvConf()
}
for {
if err := kl.runtimeState.runtimeErrors(); err != nil {
klog.Errorf("skipping pod synchronization - %v", err)
// exponential backoff
time.Sleep(duration)
duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
continue
}
// reset backoff if we have a success
duration = base
kl.syncLoopMonitor.Store(kl.clock.Now())
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}
}
2. 监听 Pod 变化(syncLoopIteration)
syncLoopIteration() 会对多个 channel 进行遍历,一旦发现某个 channel 有消息就交给对应的 handler 去处理。它会从以下 channel 中获取消息:
- configCh:该信息源由 kubeDeps 对象中的 PodConfig 子模块提供,该模块将同时 watch 3 个不同来源的 pod 信息的变化(file,http,apiserver),一旦某个来源的 pod 信息发生了更新(创建/更新/删除),这个 channel 中就会出现被更新的 pod 信息和更新的具体操作。
- syncCh:定时器管道,每隔一秒去同步最新保存的 pod 状态
- houseKeepingCh:负责 pod 清理工作
- plegCh:该信息源由 kubelet 对象中的 pleg 子模块提供,该模块主要用于周期性地向 container runtime 查询当前所有容器的状态,如果状态发生变化,则这个 channel 产生事件。
- livenessManager.Updates():健康检查发现某个 pod 不可用,kubelet 将根据 Pod 的 restartPolicy 自动执行正确的操作
// syncLoopIteration reads from various channels and dispatches pods to the
// given handler.
//
// Arguments:
// 1. configCh: a channel to read config events from
// 2. handler: the SyncHandler to dispatch pods to
// 3. syncCh: a channel to read periodic sync events from
// 4. houseKeepingCh: a channel to read housekeeping events from
// 5. plegCh: a channel to read PLEG updates from
//
// Events are also read from the kubelet liveness manager's update channel.
//
// The workflow is to read from one of the channels, handle that event, and
// update the timestamp in the sync loop monitor.
//
// With that in mind, in truly no particular order, the different channels
// are handled as follows:
//
// * configCh: dispatch the pods for the config change to the appropriate
// handler callback for the event type
// * plegCh: update the runtime cache; sync pod
// * syncCh: sync all pods waiting for sync
// * houseKeepingCh: trigger cleanup of pods
// * liveness manager: sync pods that have failed or in which one or more
// containers have failed liveness checks
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
// 1.configCh: Update from a config source; dispatch it to the right handler callback.
case u, open := <-configCh:
...
switch u.Op {
case kubetypes.ADD:
...
case kubetypes.UPDATE:
...
case kubetypes.REMOVE:
...
case kubetypes.RECONCILE:
...
case kubetypes.DELETE:
...
case kubetypes.RESTORE:
...
case kubetypes.SET:
...
}
...
// 2.plegCh
case e := <-plegCh:
...
// 3.syncCh: Sync pods waiting for sync
case <-syncCh:
....
// 4.livenessManager
case update := <-kl.livenessManager.Updates():
...
// 5.housekeepingCh
case <-housekeepingCh:
...
}
return true
}
3. 处理新增 Pod(HandlePodAddtions)
对于事件中的每个Pod,HandlePodAddtions()会执行以下操作:
- 把所有的Pod根据其创建时间进行排序,保证最早创建的Pod会最先被处理;
- 遍历Pod列表,把Pod逐个加入到podManager中。podManager 子模块负责管理这台机器上的 pod 的信息、pod 和 mirrorPod 之间的对应关系等等。所有被管理的 pod 都要出现在里面,如果 podManager 中找不到某个 pod,就认为这个 pod 被删除了。
- 判断是否是 mirror pod(即 static pod),如果是 mirror pod 则调用其单独的方法;
- 验证 pod 是否能在该节点运行,如果不可以直接拒绝;
- 通过 dispatchWork 把创建 pod 的工作下发给 podWorkers 子模块做异步处理;
- 在 probeManager 中添加 pod,如果 pod 中定义了 readiness 和 liveness 健康检查,启动 goroutine 定期进行检测。
// HandlePodAdditions is the callback in SyncHandler for pods being added from
// a config source.
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
start := kl.clock.Now()
// 1.把所有的 Pod 根据其创建时间进行排序,保证最早创建的 Pod 会最先被处理;
sort.Sort(sliceutils.PodsByCreationTime(pods))
for _, pod := range pods {
// Responsible for checking limits in resolv.conf
if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
kl.dnsConfigurer.CheckLimitsForResolvConf()
}
existingPods := kl.podManager.GetPods()
// 2. 把 pod 加入到 podManager 中
// Always add the pod to the pod manager. Kubelet relies on the pod
// manager as the source of truth for the desired state. If a pod does
// not exist in the pod manager, it means that it has been deleted in
// the apiserver and no action (other than cleanup) is required.
kl.podManager.AddPod(pod)
// 3. 判断是否是 mirror pod(即 static pod)
if kubepod.IsMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
}
if !kl.podIsTerminated(pod) {
// Only go through the admission process if the pod is not
// terminated.
// We failed pods that we rejected, so activePods include all admitted
// pods that are alive.
activePods := kl.filterOutTerminatedPods(existingPods)
// 通过 canAdmitPod 方法校验Pod能否在该计算节点创建(如:磁盘空间)
// Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
}
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
// 通过 dispatchWork 把创建 pod 的工作下发给 podWorkers 子模块做异步处理
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
// 在 probeManager 中添加 pod,如果 pod 中定义了 readiness 和 liveness 健康检查,启动 goroutine 定期进行检测。
kl.probeManager.AddPod(pod)
}
}
static pod 是由 kubelet 直接管理的,k8s apiserver 并不会感知到 static pod 的存在,当然也不会和任何一个 rs 关联上,完全是由 kubelet 进程来监管,并在它异常时负责重启。Kubelet 会通过 apiserver 为每一个 static pod 创建一个对应的 mirror pod,如此一来就可以通过 kubectl 命令查看对应的 pod,并且可以通过 kubectl logs 命令直接查看到static pod 的日志信息。
4. 下发任务(dispatchWork)
dispatchWorker() 的主要作用是把某个对 Pod 的操作(创建/更新/删除)下发给 podWorkers。
// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod is terminated, dispatchWork
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
if kl.podIsTerminated(pod) {
if pod.DeletionTimestamp != nil {
// If the pod is in a terminated state, there is no pod worker to
// handle the work item. Check if the DeletionTimestamp has been
// set, and force a status update to trigger a pod deletion request
// to the apiserver.
kl.statusManager.TerminatePod(pod)
}
return
}
// Run the sync in an async worker.
kl.podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
OnCompleteFunc: func(err error) {
if err != nil {
metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start))
metrics.DeprecatedPodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))
}
},
})
// Note the number of containers for new pods.
if syncType == kubetypes.SyncPodCreate {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}
5. 更新事件的channel(UpdatePod)
podWorkers 子模块主要的作用就是处理针对每一个的 Pod 的更新事件,比如 Pod 的创建,删除,更新。而 podWorkers 采取的基本思路是:为每一个 Pod 都单独创建一个 goroutine 和更新事件的 channel,goroutine 会阻塞式的等待 channel 中的事件,并且对获取的事件进行处理。而 podWorkers 对象自身则主要负责对更新事件进行下发。
podWorkers.UpdatePod() 方法如下:
// Apply the new setting to the specified pod.
// If the options provide an OnCompleteFunc, the function is invoked if the update is accepted.
// Update requests are ignored if a kill pod request is pending.
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
pod := options.Pod
uid := pod.UID
var podUpdates chan UpdatePodOptions
var exists bool
p.podLock.Lock()
defer p.podLock.Unlock()
if podUpdates, exists = p.podUpdates[uid]; !exists {
// We need to have a buffer here, because checkForUpdates() method that
// puts an update into channel is called from the same goroutine where
// the channel is consumed. However, it is guaranteed that in such case
// the channel is empty, so buffer of size 1 is enough.
// 创建channel
podUpdates = make(chan UpdatePodOptions, 1)
p.podUpdates[uid] = podUpdates
// Creating a new pod worker either means this is a new pod, or that the
// kubelet just restarted. In either case the kubelet is willing to believe
// the status of the pod for the first pod worker sync. See corresponding
// comment in syncPod.
// 启动一个goroutine
go func() {
defer runtime.HandleCrash()
p.managePodLoop(podUpdates)
}()
}
// 下发更新事件
if !p.isWorking[pod.UID] {
p.isWorking[pod.UID] = true
podUpdates <- *options
} else {
// if a request to kill a pod is pending, we do not let anything overwrite that request.
update, found := p.lastUndeliveredWorkUpdate[pod.UID]
if !found || update.UpdateType != kubetypes.SyncPodKill {
p.lastUndeliveredWorkUpdate[pod.UID] = *options
}
}
}
6. 调用 syncPodFn 方法同步 pod(managePodLoop)
managePodLoop() 调用 syncPodFn 方法去同步 pod,syncPodFn 实际上就是 kubelet.SyncPod(接下来马上讲到)。在完成这次 sync 动作之后,会调用 wrapUp 函数。
这个函数将会做几件事情:
- 将这个 pod 信息插入 kubelet 的 workQueue 队列中,等待下一次周期性的对这个 pod 的状态进行 sync
- 将在这次 sync 期间堆积的没有能够来得及处理的最近一次 update 操作加入 goroutine 的事件 channel 中,立即处理。
func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
var lastSyncTime time.Time
for update := range podUpdates {
err := func() error {
podUID := update.Pod.UID
// This is a blocking call that would return only if the cache
// has an entry for the pod that is newer than minRuntimeCache
// Time. This ensures the worker doesn't start syncing until
// after the cache is at least newer than the finished time of
// the previous sync.
status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
if err != nil {
...
}
err = p.syncPodFn(syncPodOptions{
mirrorPod: update.MirrorPod,
pod: update.Pod,
podStatus: status,
killPodOptions: update.KillPodOptions,
updateType: update.UpdateType,
})
lastSyncTime = time.Now()
return err
}()
// notify the call-back function if the operation succeeded or not
if update.OnCompleteFunc != nil {
update.OnCompleteFunc(err)
}
if err != nil {
...
}
p.wrapUp(update.Pod.UID, err)
}
}
7. 完成创建容器前的准备工作【核心】(syncPod)
在syncPod()方法中,主要完成以下几件事情:
-
如果状态是删除pod,立即执行并返回;如果pod的状态是正在被创建,则记录pod worker的启动延时;
-
调用generateAPIPodStatus()创建PodStatus对象,并同步 PodStatus 到 kubelet.statusManager(状态管理器)
- Pod 的
status
定义在PodStatus对象中,其中有一个phase
字段,它是 Pod 在其生命周期中的简单宏观概述。
- Pod 的
-
检查 pod 是否能运行在本节点;
-
在 statusManager 中更新 pod 的状态;
-
kill 掉不应该被运行的pod;(Kill the pod if it should not be running)
- 哪些pod属于不应该运行的呢?根据判断条件,是这3类:!runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed。比如,在第3步中会检查该pod能否运行在该节点上,如果不能,则kill掉这个pod
-
如果 Kubelet 启动时指定了 --cgroups-per-qos 参数,Kubelet 就会为该 Pod 创建 cgroup 并设置对应的资源限制。
-
如果是 static Pod,就创建或者更新对应的 mirrorPod
-
为 pod 创建数据目录,通常包括:
- Pod 目录 (通常是
/var/run/kubelet/pods/<podID>
);
- Pod 的挂载卷目录 (
<podDir>/volumes
); - Pod 的插件目录 (
<podDir>/plugins
)。
- Pod 目录 (通常是
-
卷管理器(Volume manager)会挂载
Spec.Volumes
中定义的相关数据卷,然后等待挂载成功; -
为 pod 拉取 secrets;
-
通过调用 container runtime 的 SyncPod 方法,真正实现容器的启动。也就是说,前面所做的工作只是容器启动的准备工作。
可以看到该方法是创建 pod 实体(即容器)之前需要完成的准备工作。
// The workflow is:
// * If the pod is being created, record pod worker start latency
// * Call generateAPIPodStatus to prepare an v1.PodStatus for the pod
// * If the pod is being seen as running for the first time, record pod
// start latency
// * Update the status of the pod in the status manager
// * Kill the pod if it should not be running
// * Create a mirror pod if the pod is a static pod, and does not
// already have a mirror pod
// * Create the data directories for the pod if they do not exist
// * Wait for volumes to attach/mount
// * Fetch the pull secrets for the pod
// * Call the container runtime's SyncPod callback
// * Update the traffic shaping for the pod's ingress and egress limits
//
// If any step of this workflow errors, the error is returned, and is repeated
// on the next syncPod call.
//
// This operation writes all events that are dispatched in order to provide
// the most accurate information possible about an error situation to aid debugging.
// Callers should not throw an event if this operation returns an error.
func (kl *Kubelet) syncPod(o syncPodOptions) error {
// pull out the required options
pod := o.pod
mirrorPod := o.mirrorPod
podStatus := o.podStatus
updateType := o.updateType
// if we want to kill a pod, do it now!
// 判断是否删除 pod
if updateType == kubetypes.SyncPodKill {
...
}
...
// Record pod worker start latency if being created
// TODO: make pod workers record their own latencies
// 判断是否创建 pod
// 如果 Pod 正在创建, 就会暴露一些指标(metrics),可以用于在 Prometheus 中追踪 Pod 启动延时;
if updateType == kubetypes.SyncPodCreate {
if !firstSeenTime.IsZero() {
// This is the first time we are syncing the pod. Record the latency
// since kubelet first saw the pod if firstSeenTime is set.
metrics.PodWorkerStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
metrics.DeprecatedPodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
} else {
klog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)
}
}
// Generate final API pod status with pod and status manager status
// 生成一个 APIPodStatus 对象,表示 Pod 当前阶段的状态。
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
podStatus.IP = apiPodStatus.PodIP
// Record the time it takes for the pod to become running.
existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
!firstSeenTime.IsZero() {
metrics.PodStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
metrics.DeprecatedPodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
}
// 检查 pod 是否能运行在本节点
runnable := kl.canRunPod(pod)
if !runnable.Admit {
...
}
// Update status in the status manager
// 更新pod状态,statusManage的任务是通过 kube-apiserver 异步更新 etcd 中的记录;
kl.statusManager.SetPodStatus(pod, apiPodStatus)
// Kill pod if it should not be running
// 如果pod处于非running状态,则直接kill掉
if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {
...
}
// If the network plugin is not ready, only start the pod if it uses the host network
// 加载网络插件
if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, err)
return fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err)
}
// Create Cgroups for the pod and apply resource parameters
// to them if cgroups-per-qos flag is enabled.
// 如果 Kubelet 启动时指定了 --cgroups-per-qos 参数,
// Kubelet 就会为该 Pod 创建 cgroup 并设置对应的资源限制。
// 这是为了更好的 Pod 服务质量(QoS)
pcm := kl.containerManager.NewPodContainerManager()
// If pod has already been terminated then we need not create
// or update the pod's cgroup
if !kl.podIsTerminated(pod) {
...
// Create and Update pod's Cgroups
// 创建并更新pod的cgroups
if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
...
}
}
// Create Mirror Pod for Static Pod if it doesn't already exist
if kubepod.IsStaticPod(pod) {
...
}
// Make data directories for the pod
// 创建数据目录
if err := kl.makePodDataDirs(pod); err != nil {
...
}
// Volume manager will not mount volumes for terminated pods
// 挂载volume
if !kl.podIsTerminated(pod) {
// Wait for volumes to attach/mount
...
}
}
// Fetch the pull secrets for the pod
// 获取secret信息
pullSecrets := kl.getPullSecretsForPod(pod)
// Call the container runtime's SyncPod callback
// 调用containerRuntime的SyncPod方法开始创建容器
result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
kl.reasonCache.Update(pod.UID, result)
if err := result.Error(); err != nil {
...
}
return nil
}
8. 创建容器
kuberuntime 子模块的 SyncPod() 函数真正负责pod实体的创建,SyncPod() 通过执行以下几件事情来同步正在运行的pod的状态至desired状态:
- Compute sandbox and container changes.
- Kill pod sandbox if necessary.
- Kill any containers that should not be running.
- Create sandbox if necessary.
- Create init containers.
- Create normal containers.(即启动工作负载容器)
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
// Step 1: Compute sandbox and container changes.
podContainerChanges := m.computePodActions(pod, podStatus)
klog.V(3).Infof("computePodActions got %+v for pod %q", podContainerChanges, format.Pod(pod))
if podContainerChanges.CreateSandbox {
ref, err := ref.GetReference(legacyscheme.Scheme, pod)
if err != nil {
klog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err)
}
if podContainerChanges.SandboxID != "" {
m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")
} else {
klog.V(4).Infof("SyncPod received new pod %q, will create a sandbox for it", format.Pod(pod))
}
}
// Step 2: Kill the pod if the sandbox has changed.
if podContainerChanges.KillPod {
if podContainerChanges.CreateSandbox {
klog.V(4).Infof("Stopping PodSandbox for %q, will start new one", format.Pod(pod))
} else {
klog.V(4).Infof("Stopping PodSandbox for %q because all other containers are dead.", format.Pod(pod))
}
killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
result.AddPodSyncResult(killResult)
if killResult.Error() != nil {
klog.Errorf("killPodWithSyncResult failed: %v", killResult.Error())
return
}
if podContainerChanges.CreateSandbox {
m.purgeInitContainers(pod, podStatus)
}
} else {
// Step 3: kill any running containers in this pod which are not to keep.
for containerID, containerInfo := range podContainerChanges.ContainersToKill {
klog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerInfo.name, containerID, format.Pod(pod))
killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)
result.AddSyncResult(killContainerResult)
if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil {
killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
klog.Errorf("killContainer %q(id=%q) for pod %q failed: %v", containerInfo.name, containerID, format.Pod(pod), err)
return
}
}
}
// Keep terminated init containers fairly aggressively controlled
// This is an optimization because container removals are typically handled
// by container garbage collector.
m.pruneInitContainersBeforeStart(pod, podStatus)
// We pass the value of the podIP down to generatePodSandboxConfig and
// generateContainerConfig, which in turn passes it to various other
// functions, in order to facilitate functionality that requires this
// value (hosts file and downward API) and avoid races determining
// the pod IP in cases where a container requires restart but the
// podIP isn't in the status manager yet.
//
// We default to the IP in the passed-in pod status, and overwrite it if the
// sandbox needs to be (re)started.
podIP := ""
if podStatus != nil {
podIP = podStatus.IP
}
// Step 4: Create a sandbox for the pod if necessary.
podSandboxID := podContainerChanges.SandboxID
if podContainerChanges.CreateSandbox {
...
podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
if err != nil {
...
}
podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)
if err != nil {
...
}
// If we ever allow updating a pod from non-host-network to
// host-network, we may use a stale IP.
// 如果 pod 网络是 host 模式,容器也相同;其他情况下,容器会使用 None 网络模式,让 kubelet 的网络插件自己进行网络配置
if !kubecontainer.IsHostNetworkPod(pod) {
// Overwrite the podIP passed in the pod status, since we just started the pod sandbox.
podIP = m.determinePodSandboxIP(pod.Namespace, pod.Name, podSandboxStatus)
klog.V(4).Infof("Determined the ip %q for pod %q after sandbox changed", podIP, format.Pod(pod))
}
}
// Get podSandboxConfig for containers to start.
configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)
result.AddSyncResult(configPodSandboxResult)
// 获取PodSandbox的配置,如metadata,clusterDNS,容器的端口映射等
podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)
if err != nil {
...
}
// Step 5: start the init container.
if container := podContainerChanges.NextInitContainerToStart; container != nil {
...
if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP); err != nil {
...
}
...
}
// Step 6: start containers in podContainerChanges.ContainersToStart.
// 即启动业务容器
for _, idx := range podContainerChanges.ContainersToStart {
container := &pod.Spec.Containers[idx]
...
if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP); err != nil {
...
}
}
return
}
9. 启动容器
最后由 startContainer() 来启动容器,主要有以下几个步骤:
- 拉取容器的镜像。如果是私有仓库的镜像,就会使用 PodSpec 中指定的 imagePullSecrets 来拉取该镜像;
- 通过 CRI 创建容器。 Kubelet 使用 PodSpec 中的信息填充了一个
ContainerConfig
数据结构(在其中定义了 command, image, labels, mounts, devices, environment variables 等),然后通过 protobufs 发送给 CRI。 对于 Docker 来说,它会将这些信息反序列化并填充到自己的配置信息中,然后再发送给 Dockerd 守护进程。在这个过程中,它会将一些元数据(例如容器类型,日志路径,sandbox ID 等)添加到容器中; - 然后 Kubelet 将容器注册到 CPU 管理器,它通过使用
UpdateContainerResources
CRI 方法给容器分配给本地节点上的 CPU 资源; - 最后容器真正地启动;
- 如果 Pod 中包含 Container Lifecycle Hooks,容器启动之后就会运行这些 Hooks。 Hook 的类型包括两种:Exec(执行一段命令) 和 HTTP(发送HTTP请求)。如果 PostStart Hook 启动的时间过长、挂起或者失败,容器将永远不会变成 Running 状态。
// startContainer starts a container and returns a message indicates why it is failed on error.
// It starts the container through the following steps:
// * pull the image
// * create the container
// * start the container
// * run the post start lifecycle hooks (if applicable)
func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, container *v1.Container, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string) (string, error) {
// Step 1: pull the image.
// 检查镜像是否存在,不存在则到 Docker Registry 或是 Private Registry 拉取镜像。
imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig)
if err != nil {
...
}
// Step 2: create the container.
ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil {
...
}
// For a new container, the RestartCount should be 0
// 设置 RestartCount
restartCount := 0
containerStatus := podStatus.FindContainerStatusByName(container.Name)
if containerStatus != nil {
restartCount = containerStatus.RestartCount + 1
}
// 生成容器的配置信息
containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef)
...
// 调用 CRI 调用容器运行时创建容器
containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
if err != nil {
...
}
err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
if err != nil {
...
}
...
// Step 3: start the container.
// 真正的启动容器
err = m.runtimeService.StartContainer(containerID)
if err != nil {
...
}
m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.StartedContainer, fmt.Sprintf("Started container %s", container.Name))
// Symlink container logs to the legacy container log location for cluster logging
// support.
// TODO(random-liu): Remove this after cluster logging supports CRI container log path.
containerMeta := containerConfig.GetMetadata()
sandboxMeta := podSandboxConfig.GetMetadata()
legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,
sandboxMeta.Namespace)
containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath)
// only create legacy symlink if containerLog path exists (or the error is not IsNotExist).
// Because if containerLog path does not exist, only dandling legacySymlink is created.
// This dangling legacySymlink is later removed by container gc, so it does not make sense
// to create it in the first place. it happens when journald logging driver is used with docker.
if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) {
if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {
klog.Errorf("Failed to create legacy symbolic link %q to container %q log %q: %v",
legacySymlink, containerID, containerLog, err)
}
}
// Step 4: execute the post start hook.
if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
kubeContainerID := kubecontainer.ContainerID{
Type: m.runtimeName,
ID: containerID,
}
// runner.Run 这个方法的主要作用就是在业务容器起来的时候,
// 首先会执行一个 container hook(PostStart 和 PreStop),做一些预处理工作。
// 只有 container hook 执行成功才会运行具体的业务服务,否则容器异常。
msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
if handlerErr != nil {
...
}
}
return "", nil
}
Pod 创建流程:
3. 总结
本文分析了新建 pod 的流程,当一个 pod 完成调度,与一个 node 绑定起来之后,这个 pod 就会触发 kubelet 在循环控制里注册的 handler,上图中的 HandlePods 部分。此时,通过检查 pod 在 kubelet 内存中的状态,kubelet 就能判断出这是一个新调度过来的 pod,从而触发 Handler 里的 ADD 事件对应的逻辑处理。然后 kubelet 会为这个 pod 生成对应的 podStatus,接着检查 pod 所声明的 volume 是不是准备好了,然后调用下层的容器运行时。如果是 UPDATE 事件的话,kubelet 就会根据 pod 对象具体的变更情况,调用下层的容器运行时进行容器的重建。
参考: