zoukankan      html  css  js  c++  java
  • [源码解析] 深度学习分布式训练框架 horovod (20) --- Elastic Training Operator

    [源码解析] 深度学习分布式训练框架 horovod (20) --- Elastic Training Operator

    0x00 摘要

    Horovod 是一款基于 AllReduce 的分布式训练框架。凭借其对 TensorFlow、PyTorch 等主流深度学习框架的支持,以及通信优化等特点,Horovod 被广泛应用于数据并行的训练中。

    本文是 horovod on k8s 的最后一篇,看看 MPI-Operator 可能被如何改进,主要就是根据 Elastic Training Operator 作者 团队的博客内容来学习源码。所以本文以大量源码为主。

    本系列其他文章链接如下:

    [源码解析] 深度学习分布式训练框架 Horovod (1) --- 基础知识

    [源码解析] 深度学习分布式训练框架 horovod (2) --- 从使用者角度切入

    [源码解析] 深度学习分布式训练框架 horovod (3) --- Horovodrun背后做了什么

    [源码解析] 深度学习分布式训练框架 horovod (4) --- 网络基础 & Driver

    [源码解析] 深度学习分布式训练框架 horovod (5) --- 融合框架

    [源码解析] 深度学习分布式训练框架 horovod (6) --- 后台线程架构

    [源码解析] 深度学习分布式训练框架 horovod (7) --- DistributedOptimizer

    [源码解析] 深度学习分布式训练框架 horovod (8) --- on spark

    [源码解析] 深度学习分布式训练框架 horovod (9) --- 启动 on spark

    [源码解析] 深度学习分布式训练框架 horovod (10) --- run on spark

    [源码解析] 深度学习分布式训练框架 horovod (11) --- on spark --- GLOO 方案

    [源码解析] 深度学习分布式训练框架 horovod (12) --- 弹性训练总体架构

    [源码解析] 深度学习分布式训练框架 horovod (13) --- 弹性训练之 Driver

    [源码解析] 深度学习分布式训练框架 horovod (14) --- 弹性训练发现节点 & State

    [源码解析] 深度学习分布式训练框架 horovod (15) --- 广播 & 通知

    [源码解析] 深度学习分布式训练框架 horovod (16) --- 弹性训练之Worker生命周期

    [源码解析] 深度学习分布式训练框架 horovod (17) --- 弹性训练之容错

    [源码解析] 深度学习分布式训练框架 horovod (18) --- kubeflow tf-operator

    [源码解析] 深度学习分布式训练框架 horovod (17) --- 弹性训练之容错

    [源码解析] 深度学习分布式训练框架 horovod (18) --- kubeflow tf-operator

    [源码解析] 深度学习分布式训练框架 horovod (19) --- kubeflow MPI-operator

    0x01 背景知识

    0x01, 0x02 两节均来自于 Elastic Training Operator 团队博客内容,这个博客真得很给力。

    1.1 已有弹性能力

    Kubernetes 和云计算提供敏捷性和伸缩性,我们可以通过 cluster-AutoScaler 等组件为训练任务设置弹性策略,利用 Kubernetes 的弹性能力,按需创建,减少 GPU 设备空转。

    但这种伸缩模式面对训练这种离线任务还是略有不足:

    • 不支持容错,当部分 Worker 由于设备原因失败,整个任务需要停止重来。
    • 训练任务一般时间较长,占用算力大,任务缺少弹性能力。当资源不足时,除非任务终止,无法按需为其他业务腾出资源。
    • 训练任务时间较长,不支持 worker 动态配置, 无法安全地使用抢占实例,发挥云上最大性价比

    如何给训练任务赋予弹性能力,是提高性价比的关键路径。近期 horovod 等分布式框架逐渐支持了 Elastic Training,即弹性训练能力。也就是允许一个训练任务在执行的过程中动态的扩容或者缩容训练 worker, 从不会引起训练任务的中断。需要在代码中做少量修改适配,可参考:https://horovod.readthedocs.io/en/stable/elastic_include.html。

    1.2 mpi-operator 的缺点

    在 mpi-operator 中,参与训练的 Worker 都是作为静态资源设计和维护,支持弹性训练模式后,给任务增加了灵活性,同时也给运维层带来了挑战,例如:

    • 必须通过 horovod 提供的 horovordrun 作为入口,horovod 中 launcher 通过 ssh 登陆 worker,需要打通 launcher 和 worker 之间的登陆隧道。
    • 负责计算弹性的 Elastic Driver 模块通过指定 discover_host 脚本获取最新 worker 拓扑信息,从而拉起或停止 worker 实例。当 worker 变化时,首先要更新 discover_host 脚本的返回值。
    • 在抢占或价格计算等场景中,有时需要指定 worker 缩容,K8s 原生的编排元语 deployment,statefulset 无法满足指定缩容的场景。

    针对以上问题,我们设计开发了 et-operator,提供 TrainingJob CRD 描述训练任务, ScaleOut 和 ScaleIn CRD 描述扩容和缩容操作, 通过它们的组合,使我们的训练任务更具有弹性。将这个方案开源,欢迎大家提需求、交流、吐槽。

    开源方案地址:https://github.com/AliyunContainerService/et-operator

    0x02 总体架构

    TrainingJob Controller 主要有以下功能:

    • 维护 TrainingJob 的创建/删除生命周期,以及子资源管理。
    • 执行扩缩容操作。
    • 容错,当 worker 被驱逐,创建新的 worker 加入到训练中。

    2.1 资源创建

    TrainingJob 子资源创建顺序如下:

    • 创建打通 ssh 所需的密钥对, 创建 secret。
    • 创建 workers,包含 service 和 pod,挂载 secret 公钥。
    • 创建 configmap, 包含 discover_host 脚本 , hostfile 文件。
    • 创建 launcher,挂载 configmap。由于 hostfile 后续会随着拓扑关系修改,所以 hostfile 单独通过 initcontainer 从 configmap 拷贝到单独目录。

    TrainingJob 相关资源:

    2.png

    2.2 角色

    TrainingJob CR 的配置分为 Lanucher 和 Worker。在 Launcher 中指定任务的镜像和启动执行, 默认 et-operator 会根据 worker 分配情况,生成一个 hostfile 文件和 discover_host 脚本,discover_host 脚本挂载到 Launcher 的 /etc/edl/discover_hosts.sh 文件, 在入口脚本的 horovodrun 执行中通过 --host-discovery-script 参数指定。在 Worker 设置中指定 worker 的镜像和 GPU 占用 ,并可以通过 maxReplicas / minReplicas 指定 workers 的副本数允许范围。

    3.png

    2.3 程序主流程

    程序主流程图如下:

    0x03 入口

    其实,学习 ETO 主要就是学习如何扩容和缩容。但是为了学习这个,我们还是需要梳理一下程序逻辑

    不熟悉 K8S 的同学顺便也一起看看其 CRD 如何使用。

    3.1 创建

    入口代码是 main.go/main 函数,从入口可以看出,

    • 生成了 Controller.Manager。
    • 利用这个 Manager,构建了三个 Reconciler :TrainingJobReconciler,ScaleInReconciler,ScaleOutReconciler。
    • 然后启动 Manager;
    func main() {
    	mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
    		Scheme:             scheme,
    		MetricsBindAddress: metricsAddr,
    		LeaderElection:     enableLeaderElection,
    		Port:               9443,
    	})
    
    	const jobPollInterval = "5s"
      
    	if err = controllers.NewReconciler(mgr, parseDurationOrPanic(jobPollInterval)).SetupWithManager(mgr); err != nil {
    		os.Exit(1)
    	}
    	if err = controllers.NewScaleOutReconciler(mgr, parseDurationOrPanic(jobPollInterval)).SetupWithManager(mgr); err != nil {
    		os.Exit(1)
    	}
    	if err = controllers.NewScaleInReconciler(mgr, parseDurationOrPanic(jobPollInterval)).SetupWithManager(mgr); err != nil {
    		os.Exit(1)
    	}
    
    	if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
    		os.Exit(1)
    	}
    }
    

    3.2 设置

    这里的配置就是建立了消息的响应函数,具体就是响应哪些 CR。

    • 除了 TrainingJob 外,et-operator 同时支持 ScaleOut 和 ScaleIn 两种 CRD,下发训练任务扩容和缩容操作。

    • 当下发一个 ScaleOut CR,ScaleOutController 触发 Reconcile, 这里工作很简单,根据 ScaleOut CR 中的 Selector 字段,找到 Scaler 对应的 TrainingJob,设置到 CR 的 OwnerReferences 上。

    • TrainingJobController 中监听到属于 TrainingJob 的 ScaleOut CR 有更新, 触发 TrainingJob 的 Reconcile,遍历过滤 TrainingJob 下 OwnerReference 指向的 ScaleIn 和 ScaleOut, 根据创建时间和状态时间决定执行的扩容或者缩容。

    • 执行缩容时,可以通过 ScaleIn CR 中的 spec.toDelete.count 或 spec.toDelete.podNames 字段指定缩容的 worker。通过 count 配置缩容的数量,则通过 index 计算由高到低缩容 Worker。

    func (r *ScaleInReconciler) SetupWithManager(mgr ctrl.Manager) error {
    	return ctrl.NewControllerManagedBy(mgr).
    		For(&kaiv1alpha1.ScaleIn{}).
    		Complete(r)
    }
    
    func (r *ScaleOutReconciler) SetupWithManager(mgr ctrl.Manager) error {
    	return ctrl.NewControllerManagedBy(mgr).
    		For(&kaiv1alpha1.ScaleOut{}).
    		Complete(r)
    }
    
    func (r *TrainingJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
    	return ctrl.NewControllerManagedBy(mgr).
    		For(&kaiv1alpha1.TrainingJob{}).
    		Owns(&kaiv1alpha1.ScaleIn{}).
    		Owns(&kaiv1alpha1.ScaleOut{}).
    		Owns(&corev1.Pod{}).
    		Owns(&corev1.Service{}).
    		Owns(&corev1.ConfigMap{}).
    		Owns(&corev1.Secret{}).
    		// Ignore status-only and metadata-only updates
    		//WithEventFilter(predicate.GenerationChangedPredicate{}).
    		Complete(r)
    }
    

    0x04 TrainingJobReconciler

    顺着代码梳理一下,寻找其设计思想精微之处。

    4.1 Reconcile

    k8s operator 中reconcile方法 的作用就是不断的watch,当资源变化时 就会触发reconcile方法,理论上有多少次的变化就会执行多少次的reconcile方法。

    当有消息来的时候,Reconcile 方法会得到调用。

    func (r *TrainingJobReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
    	// Fetch latest training job instance.
    	sharedTrainingJob := &kaiv1alpha1.TrainingJob{}
    	err := r.Get(context.Background(), req.NamespacedName, sharedTrainingJob)
    	trainingJob := sharedTrainingJob.DeepCopy()
    	// Check reconcile is required.
    	// No need to do reconcile or job has been deleted.
    	r.Scheme.Default(trainingJob)
    	return r.ReconcileJobs(trainingJob)
    }
    

    4.2 ReconcileJobs

    因为消息中状态是 "",所以运行了 initializeJob,并且进行 reconcileResource。

    func (r *TrainingJobReconciler) ReconcileJobs(job *kaiv1alpha1.TrainingJob) (result reconcile.Result, err error) {
    	oldJobStatus := job.Status.DeepCopy()
    
    	defer func() {
    		latestJob := &kaiv1alpha1.TrainingJob{}
    		err := r.Get(context.Background(), types.NamespacedName{
    			Name:      job.Name,
    			Namespace: job.Namespace,
    		}, latestJob)
    		if err == nil {
    			if latestJob.ObjectMeta.ResourceVersion != job.ObjectMeta.ResourceVersion {
    				latestJob.Status = job.Status
    				job = latestJob
    			}
    		}
    		r.updateObjectStatus(job, oldJobStatus)
    	}()
    
    	switch job.Status.Phase {
        case commonv1.JobSucceeded, commonv1.JobFailed:
          err = r.cleanup(job)
        case "", commonv1.JobCreated: // 如果状态为空 或者 JobCreated,则初始化
          r.initializeJob(job)
          err = r.reconcileResource(job)
        case commonv1.JobRunning:
          err = r.reconcileJobRunning(job)
        case commonv1.Scaling:
          err = r.executeScaling(job)
    	}
    
    	if err != nil {
    		if IsRequeueError(err) {
    			return RequeueAfterInterval(r.PollInterval, nil)
    		}
    		return RequeueAfterInterval(r.PollInterval, err)
    	}
    	return NoRequeue()
    }
    

    4.3 reconcileResource

    reconcileResource 其实就是调用 doSteps,调用一个状态机继续初始化。

    func (r *TrainingJobReconciler) reconcileResource(job *kaiv1alpha1.TrainingJob) error {
    	steps := r.newSteps()
    	err := r.doSteps(job, steps)
    	return err
    }
    

    4.4 doSteps

    newSteps 构建了一个简单的状态机,是一个初始化步骤,按照序列执行,doSteps 会根据状态进行不同的分支处理。

    有几点需要说明:

    • Created 之后的几个状态,应该是: WorkersCreated ---> WorkersReady ----> LauncherCreated ---> JobRunning
    • 这个是事后状态,即对应 action 完成之后应该达到的状态。
    • 在 for 循环之中,如果当前 Job 已经达到了某个状态,就跳过继续,直到某一个未完状态,就去执行对应的action。所以理论上说,会从 WorkersCreated 逐步执行到 JobRunning。
    • 在某个状态对应的 Action 中,执行完成之后,会设置 Job 为这个 完成状态。

    代码如下:

    func (r *TrainingJobReconciler) newSteps() []Step {
    	return []Step{
    		Step{
    			JobCondition: commonv1.WorkersCreated,
    			Action:       r.createTrainingJobWorkers,
    		},
    		Step{
    			JobCondition: commonv1.WorkersReady,
    			Action:       r.waitWorkersRunning,
    		},
    		Step{
    			JobCondition: commonv1.LauncherCreated,
    			Action:       r.createLauncher,
    		},
    		Step{
    			JobCondition: commonv1.JobRunning,
    			Action:       r.syncLauncherState,
    		},
    	}
    }
    
    func (r *TrainingJobReconciler) doSteps(job *kaiv1alpha1.TrainingJob, steps []Step) error {
    	for _, step := range steps {
    		if hasCondition(*job.GetJobStatus(), step.JobCondition) {
    			continue
    		}
    		err := step.Action(job)
    		break
    	}
    	return nil
    }
    

    所以具体如下:

               Request("")
    K8S  +-------------------->  Reconcile
                                     +
                                     |
                                     |
                                     v
              +----------------------+---------------------+
              |                 ReconcileJobs              |
              |                      +                     |
              |                      |                     |
              |        +------------------------------+    |
              |        |             |                |    |
              |        v             v                v    |
              |  "", JobCreated   JobRunning      Scaling  |
              +--------+-----------------------------------+
                       |
                       |
                       v
               reconcileResource
                       +
                       |
                       |
                       v
             +---------+---------------+
             | doSteps                 |
             |                         |
             |                         |
             |     WorkersCreated +---------> createTrainingJobWorkers
             |                         |
             |                         |
             |     WorkersReady  +----------> waitWorkersRunning
             |                         |
             |                         |
             |     LauncherCreated +--------> createLauncher
             |                         |
             |                         |
             |     JobRunning  +------------> syncLauncherState
             |                         |
             +-------------------------+
    
    

    4.5 createTrainingJobWorkers

    在 doSteps 步骤中,先来到 createTrainingJobWorkers 这个Action。这里会设置 Job 状态为 WorkersCreated。

    func (r *TrainingJobReconciler) createTrainingJobWorkers(job *kaiv1alpha1.TrainingJob) error {
    	if job.GetAttachMode() == kaiv1alpha1.AttachModeSSH {
    		if cm, err := r.GetOrCreateSecret(job); cm == nil || err != nil {
    			updateStatus(job.GetJobStatus(), common.JobFailed, trainingJobFailedReason, msg)
    			return nil
    		}
    	}
    
    	workers := getJobReplicasWorkers(job)
    	job.Status.TargetWorkers = workers
        
        // 创建worker
    	if err := r.CreateWorkers(job, workers); err != nil {
    		updateStatus(job.GetJobStatus(), common.JobFailed, trainingJobFailedReason, msg)
    		return nil
    	}
        // 设置新状态
    	updateJobConditions(job.GetJobStatus(), common.WorkersCreated, "", msg)
    	return nil
    }
    

    4.5.1 CreateWorkers

    CreateWorkers 会进行创建worker,如本文前面介绍,worker 包含 service 和 pod,所以创建过程具体为:

    • 调用 另一个同名函数CreateWorkers 来间接创建 workerService。

    • 调用 newWorker 去创建 Pod。

    func (r *TrainingJobReconciler) CreateWorkers(job *kaiv1alpha1.TrainingJob, workers []string) error {
    	return r.createWorkers(job, workers, func(name string, index string) *corev1.Pod {
    		worker := newWorker(job, name, index)
    		return worker
    	})
    }
    
    
    4.5.1.1 createWorkers

    这里会循环调用 createWorker 依据配置生成一系列 workers

    func (r *TrainingJobReconciler) createWorkers(job *kaiv1alpha1.TrainingJob, workers []string, newPod PodTplGenerator) error {
        // 遍历,创建
    	for _, podName := range workers {
    		index, err := getWorkerIndex(job.Name, podName)
    		if err != nil {
    			return err
    		}
    		_, err = r.createWorker(job, int32(index), newPod)
    		if err != nil {
    			return err
    		}
    	}
    	return nil
    }
    
    
    4.5.1.2 createWorker

    这里会依据参数对 worker Pod 进行判断,如果不存在,则创建 某一个 worker

    func (r *TrainingJobReconciler) createWorker(job *kaiv1alpha1.TrainingJob, index int32, workerPodTempl PodTplGenerator) (*corev1.Pod, error) {
    	name := getWorkerName(job.Name, int(index))
    	indexStr := strconv.Itoa(int(index))
    	pod := &corev1.Pod{}
    	nsn := types.NamespacedName{
    		Name:      name,
    		Namespace: job.Namespace,
    	}
    	err := r.Get(context.Background(), nsn, pod)
    
    	if err != nil {
    		// If the worker Pod doesn't exist, we'll create it.
    		if errors.IsNotFound(err) {
                // 如果没有pod,这里也可以创建pod
    			worker := workerPodTempl(name, indexStr)
    			if job.GetAttachMode() == kaiv1alpha1.AttachModeSSH {
    				util.MountRsaKey(worker, job.Name)
    			}
    			if err = r.Create(context.Background(), worker); err != nil {
    				return nil, err
    			}
    		} 
    	}
    
    	service := &corev1.Service{}
    	err = r.Get(context.Background(), nsn, service)
    	if errors.IsNotFound(err) {
            // 调用newService 进行具体创建
    		err = r.Create(context.Background(), newService(job, name, indexStr))
    	}
    	return nil, nil
    }
    
    
    4.5.1.3 newService

    这里才来到具体创建service,真是百转千回。

    func newService(obj interface{}, name string, index string) *corev1.Service {
    	job, _ := obj.(*kaiv1alpha1.TrainingJob)
    	labels := GenLabels(job.Name)
    	labels[labelTrainingRoleType] = worker
    	labels[replicaIndexLabel] = index
    	return &corev1.Service{ // 具体创建
    		ObjectMeta: metav1.ObjectMeta{
    			Name:      name,
    			Namespace: job.Namespace,
    			Labels:    labels,
    			OwnerReferences: []metav1.OwnerReference{
    				*metav1.NewControllerRef(job, kaiv1alpha1.SchemeGroupVersionKind),
    			},
    		},
    		Spec: corev1.ServiceSpec{
    			ClusterIP: "None",
    			Selector:  labels,
    			Ports: []corev1.ServicePort{
    				{
    					Name: "ssh-port",
    					Port: 22,
    				},
    			},
    		},
    	}
    }
    
    
    

    4.5.2 newWorker

    newWorker 则构建了 Pod,就是比较常见的套路。

    func newWorker(obj interface{}, name string, index string) *corev1.Pod {
    	job, _ := obj.(*kaiv1alpha1.TrainingJob)
    	labels := GenLabels(job.Name)
    	labels[labelTrainingRoleType] = worker
    	labels[replicaIndexLabel] = index
    	podSpec := job.Spec.ETReplicaSpecs.Worker.Template.DeepCopy()
    
    	// keep the labels which are set in PodTemplate
    	if len(podSpec.Labels) == 0 {
    		podSpec.Labels = make(map[string]string)
    	}
    	for key, value := range labels {
    		podSpec.Labels[key] = value
    	}
    
    	// RestartPolicy=Never
    	setRestartPolicy(podSpec)
    
    	container := podSpec.Spec.Containers[0]
    
    	// if we want to use ssh, will start sshd service firstly.
    	if len(container.Command) == 0 {
    		if job.GetAttachMode() == kaiv1alpha1.AttachModeSSH {
    			container.Command = []string{"sh", "-c", "/usr/sbin/sshd  && sleep 365d"}
    		} else {
    			container.Command = []string{"sh", "-c", "sleep 365d"}
    		}
    	}
    	podSpec.Spec.Containers[0] = container
    
        // 创建了pod
    	return &corev1.Pod{
    		ObjectMeta: metav1.ObjectMeta{
    			Name:        name,
    			Namespace:   job.Namespace,
    			Labels:      podSpec.Labels,
    			Annotations: podSpec.Annotations,
    			OwnerReferences: []metav1.OwnerReference{
    				*metav1.NewControllerRef(job, kaiv1alpha1.SchemeGroupVersionKind),
    			},
    		},
    		Spec: podSpec.Spec,
    	}
    }
    
    

    逻辑如下:

               Request("")
    K8S  +-------------------->  Reconcile
                                     +
                                     |
                                     |
                                     v
              +----------------------+---------------------+
              |                 ReconcileJobs              |
              |                      +                     |
              |                      |                     |
              |        +------------------------------+    |
              |        |             |                |    |
              |        v             v                v    |
              |  "", JobCreated   JobRunning      Scaling  |
              +--------+-----------------------------------+
                       |
                       |
                       v
               reconcileResource
                       +
                       |
                       |
                       v
             +---------+---------------+
             | doSteps                 |                                           +----> createWorkers +----> createWorker +----> newService
             |                         |                                           |
             |                         |                                           +
             |     WorkersCreated +---------> createTrainingJobWorkers +-----> CreateWorkers  +------->  newWorker +------> WorkersCreated
             |                         |
             |                         |
             |     WorkersReady  +----------> waitWorkersRunning
             |                         |
             |                         |
             |     LauncherCreated +--------> createLauncher
             |                         |
             |                         |
             |     JobRunning  +------------> syncLauncherState
             |                         |
             +-------------------------+
    
    

    手机如下:

    4.8 createLauncher

    建立完 worker 之后,就开始建立 Launcher。所以继续执行 createLauncher。

    func (r *TrainingJobReconciler) createLauncher(job *kaiv1alpha1.TrainingJob) error {
    	if _, err := r.GetOrCreateLauncherServiceAccount(job); err != nil {
    		updateStatus(job.GetJobStatus(), commonv1.JobFailed, trainingJobFailedReason, msg)
    		return nil
    	}
    	if _, err := r.GetOrCreateLauncherRole(job, 0); err != nil {
    		updateStatus(job.GetJobStatus(), commonv1.JobFailed, trainingJobFailedReason, msg)
    		return nil
    	}
    	if _, err := r.GetLauncherRoleBinding(job); err != nil {
    		updateStatus(job.GetJobStatus(), commonv1.JobFailed, trainingJobFailedReason, msg)
    		return nil
    	}
    
    	if cm, err := r.CreateHostConfigMap(job); cm == nil || err != nil {
    		updateStatus(job.GetJobStatus(), commonv1.JobFailed, trainingJobFailedReason, msg)
    		return nil
    	}
    
    	launcher, err := r.GetLauncherJob(job)
    
    	if launcher == nil {
    		if _, err := r.CreateLauncher(job); err != nil {
    			updateStatus(job.GetJobStatus(), commonv1.JobFailed, trainingJobFailedReason, msg)
    			return nil
    		}
    	}
    
    	updateJobConditions(job.GetJobStatus(), commonv1.LauncherCreated, "", msg)
    	return nil
    }
    

    我们取两个重点步骤。

    4.8.1 CreateHostConfigMap

    这里获取关于host的配置。

    func (r *TrainingJobReconciler) CreateHostConfigMap(job *kaiv1alpha1.TrainingJob) (*corev1.ConfigMap, error) {
    	return r.createConfigMap(job, newHostfileConfigMap)
    }
    
    func (r *TrainingJobReconciler) createConfigMap(job *kaiv1alpha1.TrainingJob, newCm func(job *kaiv1alpha1.TrainingJob) *corev1.ConfigMap) (*corev1.ConfigMap, error) {
    	cm := &corev1.ConfigMap{}
    	name := ctrl.Request{}
    	name.NamespacedName.Namespace = job.GetNamespace()
    	name.NamespacedName.Name = job.GetName() + configSuffix
    	err := r.Get(context.Background(), name.NamespacedName, cm)
    	if errors.IsNotFound(err) {
    		if err = r.Create(context.Background(), newCm(job)); err != nil {
    			return cm, err
    		}
    	}
    	return cm, nil
    }
    
    

    4.8.2 创建pod

    4.8.2.1 CreateLauncher

    这里进行pod的创建

    func (r *TrainingJobReconciler) CreateLauncher(obj interface{}) (*corev1.Pod, error) {
    	job, ok := obj.(*kaiv1alpha1.TrainingJob)
    	launcher := newLauncher(job) // 创建pod
    	if job.GetAttachMode() == kaiv1alpha1.AttachModeSSH {
    		util.MountRsaKey(launcher, job.Name)
    	}
    	err := r.Create(context.Background(), launcher)
    	return launcher, nil
    }
    
    
    4.8.2.2 newLauncher

    这里就是具体构建 Pod。

    func newLauncher(obj interface{}) *corev1.Pod {
    	job, _ := obj.(*kaiv1alpha1.TrainingJob)
    	launcherName := job.Name + launcherSuffix
    	labels := GenLabels(job.Name)
    	labels[labelTrainingRoleType] = launcher
    	podSpec := job.Spec.ETReplicaSpecs.Launcher.Template.DeepCopy()
    	// copy the labels and annotations to pod from PodTemplate
    	if len(podSpec.Labels) == 0 {
    		podSpec.Labels = make(map[string]string)
    	}
    	for key, value := range labels {
    		podSpec.Labels[key] = value
    	}
    	podSpec.Spec.InitContainers = append(podSpec.Spec.InitContainers, initContainer(job))
    	//podSpec.Spec.InitContainers = append(podSpec.Spec.InitContainers, kubedeliveryContainer())
    
    	container := podSpec.Spec.Containers[0]
    	container.VolumeMounts = append(container.VolumeMounts,
    		corev1.VolumeMount{
    			Name:      hostfileVolumeName,
    			MountPath: hostfileMountPath,
    		},
    		corev1.VolumeMount{
    			Name:      configVolumeName,
    			MountPath: configMountPath,
    		},
    		corev1.VolumeMount{
    			Name:      kubectlVolumeName,
    			MountPath: kubectlMountPath,
    		})
    
    	if job.GetAttachMode() == kaiv1alpha1.AttachModeKubexec {
    		container.Env = append(container.Env, corev1.EnvVar{
    			Name:  "OMPI_MCA_plm_rsh_agent",
    			Value: getKubexecPath(),
    		})
    	}
    	podSpec.Spec.Containers[0] = container
    	podSpec.Spec.ServiceAccountName = launcherName
    
    	setRestartPolicy(podSpec)
    	hostfileMode := int32(0444)
    	scriptMode := int32(0555)
    
    	podSpec.Spec.Volumes = append(podSpec.Spec.Volumes,
    		corev1.Volume{
    			Name: hostfileVolumeName,
    			VolumeSource: corev1.VolumeSource{
    				EmptyDir: &corev1.EmptyDirVolumeSource{},
    			},
    		},
    		corev1.Volume{
    			Name: kubectlVolumeName,
    			VolumeSource: corev1.VolumeSource{
    				EmptyDir: &corev1.EmptyDirVolumeSource{},
    			},
    		},
    		corev1.Volume{
    			Name: configVolumeName,
    			VolumeSource: corev1.VolumeSource{
    				ConfigMap: &corev1.ConfigMapVolumeSource{
    					LocalObjectReference: corev1.LocalObjectReference{
    						Name: job.Name + configSuffix,
    					},
    					Items: []corev1.KeyToPath{
    						{
    							Key:  hostfileName,
    							Path: hostfileName,
    							Mode: &hostfileMode,
    						},
    						{
    							Key:  discoverHostName,
    							Path: discoverHostName,
    							Mode: &hostfileMode,
    						},
    						{
    							Key:  kubexeclFileName,
    							Path: kubexeclFileName,
    							Mode: &scriptMode,
    						},
    					},
    				},
    			},
    		})
    	return &corev1.Pod{
    		ObjectMeta: metav1.ObjectMeta{
    			Name:        launcherName,
    			Namespace:   job.Namespace,
    			Labels:      podSpec.Labels,
    			Annotations: podSpec.Annotations,
    			OwnerReferences: []metav1.OwnerReference{
    				*metav1.NewControllerRef(job, kaiv1alpha1.SchemeGroupVersionKind),
    			},
    		},
    		Spec: podSpec.Spec,
    	}
    }
    
    

    至此,一个新的训练job被运行起来,其逻辑拓展如下:

               Request("")
    K8S  --------------------->  Reconcile
                                     +
                                     |
                                     |
                                     v
              +----------------------+---------------------+
              |                 ReconcileJobs              |
              |                      +                     |
              |                      |                     |
              |        +------------------------------+    |
              |        |             |                |    |
              |        v             v                v    |
              |  "", JobCreated   JobRunning      Scaling  |
              +--------+-----------------------------------+
                       |
                       |
                       v
               reconcileResource
                       +
                       |
                       |
                       v
             +---------+---------------+
             | doSteps                 |                                           +----> createWorkers +----> createWorker +----> newService
             |                         |                                           |
             |                         |                                           |
             |     WorkersCreated +---------> createTrainingJobWorkers +-----> CreateWorkers  +------->  newWorker +------> WorkersCreated
             |                         |
             |                         |
             |     WorkersReady  +----------> waitWorkersRunning
             |                         |
             |                         |
             |     LauncherCreated +--------> createLauncher+----> CreateHostConfigMap +-----> CreateLauncher  +------>  newLauncher
             |                         |
             |                         |
             |     JobRunning  +------------> syncLauncherState
             |                         |
             +-------------------------+
    
    

    手机如下:

    完成了新job的创建,我们看看本文的关键技术点,scaleOut 和 scaleIn。

    0x05 ScaleOut

    5.1 思路

    ScaleOut 任务 CR如下:

    4.png

    当下发一个 ScaleOut CR,ScaleOutController 触发 Reconcile, 这里工作很简单,根据 ScaleOut CR 中的 Selector 字段,找到 Scaler 对应的 TrainingJob,设置到 CR 的 OwnerReferences 上。

    以一个 ScaleOut 操作举例:

    - apiVersion: kai.alibabacloud.com/v1alpha1
      kind: ScaleOut
      metadata:
        creationTimestamp: "2020-11-04T13:54:26Z
        name: scaleout-ptfnk
        namespace: default
        ownerReferences:
        - apiVersion: kai.alibabacloud.com/v1alpha1
          blockOwnerDeletion: true
          controller: true
          kind: TrainingJob
          name: elastic-training // 指向扩容对象TrainingJob
          uid: 075b9c4a-22f9-40ce-83c7-656b329a2b9e
      spec:
      selector:
        name: elastic-training
      toAdd:
        count: 2
    
    

    5.2 Reconcile

    当下发一个 ScaleOut CR,ScaleOutController 触发 Reconcile。主要就是调用 setScalingOwner。

    func (r *ScaleOutReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
    	scaleOut, err := getScaleOut(req.NamespacedName, r.Client)
    	if err != nil {
    		// Error reading the object - requeue the request.
    		return RequeueImmediately()
    	}
    	if scaleOut == nil || scaleOut.DeletionTimestamp != nil {
    		return NoRequeue()
    	}
    
    	if isScaleFinished(*scaleOut.GetJobStatus()) {
    		return NoRequeue()
    	}
    
      return setScalingOwner(r, scaleOut, r.PollInterval)
    }
    
    

    5.3 setScalingOwner

    setScalingOwner 是关键之一。

    这里主要是处理当 ScaleOut CR 没有设置 OwnerReferences 的情况,就设置一个。

    逻辑是 根据 ScaleOut CR 中的 Selector 字段,找到 Scaler 对应的 TrainingJob,设置到 CR 的 OwnerReferences 上。

    func setScalingOwner(r client.Client, scaler Scaler, pollInterval time.Duration) (ctrl.Result, error) {
    	ownerRefs := scaler.GetOwnerReferences()
    	if len(ownerRefs) == 0 {
    		trainingJob := &kaiv1alpha1.TrainingJob{}
    		nsn := types.NamespacedName{}
    		nsn.Namespace = scaler.GetNamespace()
    		nsn.Name = scaler.GetSelector().Name
    		err := r.Get(context.Background(), nsn, trainingJob)
    		gvk := kaiv1alpha1.SchemeGroupVersionKind
    		ownerRefs = append(ownerRefs, *metav1.NewControllerRef(trainingJob, schema.GroupVersionKind{Group: gvk.Group, Version: gvk.Version, Kind: gvk.Kind}))
    		scaler.SetOwnerReferences(ownerRefs)
    
    		initializeJobStatus(scaler.GetJobStatus())
    		updateJobConditions(scaler.GetJobStatus(), v1.JobCreated, "", msg)
    		err = r.Status().Update(context.Background(), scaler)
    		err = r.Update(context.Background(), scaler)
    	}
    	return NoRequeue()
    }
    
    // RequeueAfterInterval requeues after a duration when duration > 0 is specified.
    func RequeueAfterInterval(interval time.Duration, err error) (ctrl.Result, error) {
    	return ctrl.Result{RequeueAfter: interval}, err
    }
    
    

    5.4 TrainingJobController

    TrainingJobController 中监听到属于 TrainingJob 的 ScaleOut CR 有更新, 触发 TrainingJob 的 Reconcile,遍历过滤 TrainingJob 下 OwnerReference 指向的 ScaleIn 和 ScaleOut, 根据创建时间和状态时间决定执行的扩容或者缩容

    5.4.1 Reconcile

    func (r *TrainingJobReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
    
    	rlog := r.Log.WithValues("trainingjob", req.NamespacedName)
    	// Fetch latest training job instance.
    	sharedTrainingJob := &kaiv1alpha1.TrainingJob{}
    	err := r.Get(context.Background(), req.NamespacedName, sharedTrainingJob)
    
    	trainingJob := sharedTrainingJob.DeepCopy()
    	// Check reconcile is required.
    	// No need to do reconcile or job has been deleted.
    
    	r.Scheme.Default(trainingJob)
    
    	return r.ReconcileJobs(trainingJob)
    }
    
    

    5.4.2 ReconcileJobs

    func (r *TrainingJobReconciler) ReconcileJobs(job *kaiv1alpha1.TrainingJob) (result reconcile.Result, err error) {
    	oldJobStatus := job.Status.DeepCopy()
    
    	logger.Infof("jobName: %v, phase %s", job.Name, job.Status.Phase)
    
    	defer func() {
    		latestJob := &kaiv1alpha1.TrainingJob{}
    		err := r.Get(context.Background(), types.NamespacedName{
    			Name:      job.Name,
    			Namespace: job.Namespace,
    		}, latestJob)
    		if err == nil {
    			if latestJob.ObjectMeta.ResourceVersion != job.ObjectMeta.ResourceVersion {
    				latestJob.Status = job.Status
    				job = latestJob
    			}
    		}
    		r.updateObjectStatus(job, oldJobStatus)
    	}()
    
    	switch job.Status.Phase {
    	case commonv1.JobSucceeded, commonv1.JobFailed:
    		err = r.cleanup(job)
    	case "", commonv1.JobCreated:
    		r.initializeJob(job)
    		err = r.reconcileResource(job)
    	case commonv1.JobRunning:
    		err = r.reconcileJobRunning(job)
    	case commonv1.Scaling:
    		err = r.executeScaling(job)
    	default:
    		logger.Warnf("job %s unknown status %s", job.Name, job.Status.Phase)
    	}
    
    	if err != nil {
    		if IsRequeueError(err) {
    			return RequeueAfterInterval(r.PollInterval, nil)
    		}
    		return RequeueAfterInterval(r.PollInterval, err)
    	}
    	return NoRequeue()
    }
    
    

    以下根据当前 job 状态不同,就有两条线,先是 JobRunning ,然后是 Scaling,最后恢复成 JobRunning。

    我们一一分析。

    5.5 JobRunning

    首先是来到 JobRunning 状态,我们依次看看如何处理。

    5.5.1 reconcileJobRunning

    func (r *TrainingJobReconciler) reconcileJobRunning(job *kaiv1alpha1.TrainingJob) error {
    	if err := r.syncLauncherState(job); err != nil {
    		return err
    	}
    	if err := r.syncWorkersState(job); err != nil {
    		return err
    	}
    
    	if job.Status.Phase == commonv1.JobRunning {
    		return r.setTrainingJobScaler(job) // 既然是JobRunning状态,就可以开始进行设置scaler
    	}
    
    	return nil
    }
    
    

    5.5.2 setTrainingJobScaler

    首先,通过 availableScaleOutList 或者 availableScaleInList ,然后进行update。

    func (r *TrainingJobReconciler) setTrainingJobScaler(job *kaiv1alpha1.TrainingJob) error {
    	scaleOut, err := r.availableScaleOutList(job) // 找到scaleout list
    
    	scaleIn, err := r.availableScaleInList(job) // 找到scaleIn list
    
    	scalerList := append(scaleOut, scaleIn...) // 合并
    
    	// Select the latest scaling job
    	r.updateLatestScaler(job, scalerList) // 开始设置
    	return nil
    }
    
    

    5.5.3 updateLatestScaler

    依据创建时间和状态时间,找到最后一个Scaler。

    func (r *TrainingJobReconciler) updateLatestScaler(job *kaiv1alpha1.TrainingJob, scalers []Scaler) error {
    	var latestScaler Scaler
    	if len(scalers) == 0 {
    		return nil
    	}
    	for i, _ := range scalers {
    		scalerItem := scalers[i]
            // 依据创建时间和状态时间,找到最后一个Scaler
    		if latestScaler == nil || latestScaler.GetCreationTimestamp().Time.Before(scalerItem.GetCreationTimestamp().Time) {
    			latestScaler = scalerItem
    		}
    	}
    	return r.updateCurrentScaler(job, latestScaler)
    }
    
    

    5.5.4 updateCurrentScaler

    对找到的scaler进行设置。

    func (r *TrainingJobReconciler) updateCurrentScaler(job *kaiv1alpha1.TrainingJob, scaleItem Scaler) error {
    	job.Status.CurrentScaler = scaleItem.GetFullName()
    	msg := fmt.Sprintf("trainingJobob(%s/%s) execute %s", job.Namespace, job.Name, scaleItem.GetFullName())
        
        // 设置状态
    	r.updateScalerState(scaleItem, job, newCondition(common.Scaling, scalingStartReason, msg))
    
    	if err := r.updateObjectStatus(scaleItem, nil); err != nil {
    		return err
    	}
    	return nil
    }
    
    

    5.5.5 updateScalerState

    这时候会设置 common.Scaling。所以下次运行,会到 Scaling 分支。

    func (r *TrainingJobReconciler) updateScalerState(scaleObj Scaler, trainingJob *kaiv1alpha1.TrainingJob, condition common.JobCondition) error {
        
    	jobPhase := common.Scaling // 设置 common.Scaling。所以下次运行,会到 Scaling 分支
    	currentJob := scaleObj.GetFullName()
    	if condition.Type == common.ScaleSucceeded || condition.Type == common.ScaleFailed {
    		jobPhase = common.JobRunning
    		currentJob = ""
    	}
    
    	setCondition(trainingJob.GetJobStatus(), condition)
    	updateStatusPhase(trainingJob.GetJobStatus(), jobPhase)
    	updateTrainingJobCurrentScaler(trainingJob.GetJobStatus(), currentJob)
    
    	setCondition(scaleObj.GetJobStatus(), condition)
    	updateStatusPhase(scaleObj.GetJobStatus(), condition.Type)
    
    	return nil
    }
    
    

    逻辑如下:

               1 Request("")
      K8S  +-------------------->  Reconcile  <------------------+
               2 ScaleOut CR           +                         |
      K8S  +-------------------->      |                         |
                                       |                         |
                                       v                         |
                +----------------------+---------------------+   |
                |                 ReconcileJobs              |   |
                |                      +                     |   |
                |                      |                     |   |
                |        +------------------------------+    |   |
                |     1  |             | 2            3 |    |   |
                |        v             v                v    |   |
                |  "", JobCreated   JobRunning      Scaling  |   |
                +--------+-------------+---------------------+   |
                         |             |                         |
                      1  |             | 2                       |
                         v             v                         |
                 reconcileResource   reconcileJobRunning         |
                         +             +                         |
                      1  |             | 2                       |
                         |             |                         |
                         v             v                         |
    +--------------------+----+      setTrainingJobScaler        |
    | doSteps                 |        +                         |
    |                         |        | 2                       |
    |                         |        |                         |
    |     WorkersCreated      |        v                         |
    |                         |      updateScalerState           |
    |                         |        +                         |
    |     WorkersReady        |        |                         |
    |                         |        | 2                       |
    |                         |        v                         |
    |     LauncherCreated     |      common.Scaling              |
    |                         |        +                         |
    |                         |        |                         |
    |     JobRunning          |        | 2                       |
    |                         |        |                         |
    +-------------------------+        +-------------------------+
    
    
    

    5.6 Scaling

    5.6.1 executeScaling

    依据 scale 的类型不同,进行不同扩展。

    func (r *TrainingJobReconciler) executeScaling(job *kaiv1alpha1.TrainingJob) error {
    	if err := r.syncLauncherState(job); err != nil {
    		return err
    	}
    
    	if job.Status.CurrentScaler == "" {
    		updateStatusPhase(job.GetJobStatus(), common.JobRunning)
    		return nil
    	}
    
    	if isFinished(*job.GetJobStatus()) {
    		return nil
    	}
    
    	scalerType, scalerName := getScalerName(job.Status.CurrentScaler)
        // 根据 in 还是 out 进行不同的处理
    	if scalerType == "ScaleIn" {
    		scaleIn, err := getScaleIn(scalerName, r)
    
    		if scaleIn == nil || isScaleFinished(*scaleIn.GetJobStatus()) {
    			finishTrainingScaler(job.GetJobStatus())
    			return nil
    		}
    
    		oldStatus := scaleIn.Status.DeepCopy()
    		defer r.updateObjectStatus(scaleIn, oldStatus)
    
            // 执行具体缩容操作
    		if err = r.executeScaleIn(job, scaleIn); err != nil {
    			return err
    		}
    	} else if scalerType == "ScaleOut" {
    		scaleOut, err := getScaleOut(scalerName, r)
    
    		if scaleOut == nil || isScaleFinished(*scaleOut.GetJobStatus()) {
    			finishTrainingScaler(job.GetJobStatus())
    			return nil
    		}
    
    		oldStatus := scaleOut.Status.DeepCopy()
    		defer r.updateObjectStatus(scaleOut, oldStatus)
    
            // 执行具体扩容操作
    		if err = r.executeScaleOut(job, scaleOut); err != nil {
    		}
    	}
    	return nil
    }
    
    

    5.6.2 executeScaleOut

    进行扩展。

    • 使用 setScaleOutWorkers 对 scaleOut.Status.AddPods 进行添加新 pods。
    • 使用 workersAfterScaler 得到 最终的 worker。
    • 使用 executeScaleScript 进行scale 操作。
    func (r *TrainingJobReconciler) executeScaleOut(job *kaiv1alpha1.TrainingJob, scaleOut *kaiv1alpha1.ScaleOut) error {
    
      initializeJobStatus(scaleOut.GetJobStatus())
    
    	if err := r.validateScaleOut(scaleOut); err != nil {
    		r.updateScalerFailed(scaleOut, job, err.Error())
    		return err
    	}
    
    	if err := r.setScaleOutWorkers(job, scaleOut); err != nil {
    		return err
    	}
    
    	err := r.ScaleOutWorkers(job, scaleOut)
    	if err != nil {
    		msg := fmt.Sprintf("%s create scaleout workers failed, error: %v", scaleOut.GetFullName(), err)
    		r.ScaleOutFailed(job, scaleOut, msg)
    		return err
    	}
    
    	scaleOutWorkers, err := r.getScalerOutWorkers(job, scaleOut)
    
    	workerStatuses, _ := r.workerReplicasStatus(scaleOut.GetJobStatus(), scaleOutWorkers)
    
    	if workerStatuses.Active < *scaleOut.Spec.ToAdd.Count {
    		if IsScaleOutTimeout(scaleOut) {
    			msg := fmt.Sprintf("scaleout job %s execution timeout", scaleOut.GetFullName())
    			r.ScaleOutFailed(job, scaleOut, msg)
    		}
    		return NewRequeueError(fmt.Errorf("wait for workers running"))
    	}
    
    	hostWorkers := r.workersAfterScaler(job.Status.CurrentWorkers, scaleOut)
    
    	// execute scalein script
        // 执行scale脚本
    	if err := r.executeScaleScript(job, scaleOut, hostWorkers); err != nil {
    		msg := fmt.Sprintf("%s execute script failed, error: %v", scaleOut.GetFullName(), err)
    		r.ScaleOutFailed(job, scaleOut, msg)
    		return err
    	} else {
    		job.Status.TargetWorkers = r.workersAfterScaler(job.Status.TargetWorkers, scaleOut)
    		r.updateScalerSuccessd(scaleOut, job)
    	}
    
    	return nil
    }
    
    

    5.6.3 executeScaleScript

    这时候调用 hostfileUpdateScript,更新 host file;

    最终调用 executeOnLauncher执行脚本。

    func (r *TrainingJobReconciler) executeScaleScript(trainingJob *kaiv1alpha1.TrainingJob, scaler Scaler, workers []string) error {
    	if isScriptExecuted(*scaler.GetJobStatus()) {
    		return nil
    	}
    	msg := fmt.Sprintf("trainingjob(%s/%s): execute script on launcher for %s", trainingJob.Namespace, trainingJob.Name, scaler.GetFullName())
    
    	slots := getSlots(trainingJob)
    	scriptSpec := scaler.GetScriptSpec()
    
    	var script string
        // 得到脚本
    	if scriptSpec.Script != "" {
    		script = scalerScript(scriptSpec.GetTimeout(), scriptSpec.Env, scriptSpec.Script, scaler.GetPodNames(), slots)
    	} else {
    		hostfilePath := getHostfilePath(trainingJob)
    		script = hostfileUpdateScript(hostfilePath, workers, slots)
    	}
    
        // 执行脚本
    	_, _, err := r.executeOnLauncher(trainingJob, script)
    
    	updateJobConditions(scaler.GetJobStatus(), common.ScriptExecuted, "", msg)
    	return nil
    }
    
    
    5.6.3.1 hostfileUpdateScript

    得到最终的脚本string。

    func hostfileUpdateScript(hostfile string, workers []string, slot int) string {
    	return fmt.Sprintf(
    		`echo '%s' > %s`, getHostfileContent(workers, slot), hostfile)
    }
    
    
    5.6.3.2 getHostfileContent

    获取host file内容

    func getHostfileContent(workers []string, slot int) string {
    	var buffer bytes.Buffer
    	for _, worker := range workers {
    		buffer.WriteString(fmt.Sprintf("%s:%d
    ", worker, slot))
    	}
    	return buffer.String()
    }
    
    
    5.6.3.3 executeOnLauncher

    在pod上执行

    func (r *TrainingJobReconciler) executeOnLauncher(trainingJob *kaiv1alpha1.TrainingJob, script string) (string, string, error) {
    	var err error
    	var launcherPod *corev1.Pod
    	if launcherPod, err = r.GetLauncherJob(trainingJob); err != nil {
    	}
    
    	if launcherPod != nil {
    		stdOut, stdErr, err := kubectlOnPod(launcherPod, script)
    		return stdOut, stdErr, nil
    	}
    	return "", "", nil
    }
    
    
    
    5.6.3.4 kubectlOnPod

    拉动 worker。

    func kubectlOnPod(pod *corev1.Pod, cmd string) (string, string, error) {
    	cmds := []string{
    		"/bin/sh",
    		"-c",
    		cmd,
    	}
    	stdout, stderr, err := util.ExecCommandInContainerWithFullOutput(pod.Name, pod.Spec.Containers[0].Name, pod.Namespace, cmds)
    	if err != nil {
    		return stdout, stderr, err
    	}
    	return stdout, stderr, nil
    }
    
    

    逻辑如下:

               1 Request("")
      K8S  +-------------------->  Reconcile  <------------------+
               2 ScaleOut CR           +                         |
      K8S  +-------------------->      |                         |
                                       |                         |
                                       v                         |
                +----------------------+---------------------+   |
                |                 ReconcileJobs              |   |
                |                      +                     |   |
                |                      |                     |   |
                |        +------------------------------+    |   |
                |     1  |             | 2            3 |    |   |
                |        v             v                v    |   |   3
                |  "", JobCreated   JobRunning      Scaling +----------->  executeScaling
                +--------+-------------+---------------------+   |              +
                         |             |                         |              |
                      1  |             | 2                       |              | 3
                         v             v                         |              v
                 reconcileResource   reconcileJobRunning         |        executeScaleOut
                         +             +                         |              +
                      1  |             | 2                       |              |
                         |             |                         |              | 3
                         v             v                         |              v
    +--------------------+----+      setTrainingJobScaler        |      executeScaleScript
    | doSteps                 |        +                         |              +
    |                         |        | 2                       |              |
    |                         |        |                         |              | 3
    |     WorkersCreated      |        v                         |              v
    |                         |      updateScalerState           |     hostfileUpdateScript
    |                         |        +                         |              +
    |     WorkersReady        |        |                         |              | 3
    |                         |        | 2                       |              |
    |                         |        v                         |              v
    |     LauncherCreated     |      common.Scaling              |       executeOnLauncher
    |                         |        +                         |              +
    |                         |        |                         |              |
    |     JobRunning          |        | 2                       |              | 3
    |                         |        |                         |              v
    +-------------------------+        +-------------------------+         kubectlOnPod
    
    
    

    0x06 ScaleIn

    6.1 思路

    ScaleIn 任务 CR如下:

    5.png

    执行缩容时,可以通过 ScaleIn CR 中的 spec.toDelete.count 或 spec.toDelete.podNames 字段指定缩容的 worker。

    通过 count 配置缩容的数量,则通过 index 计算由高到低缩容 Worker。

    apiVersion: kai.alibabacloud.com/v1alpha1
    kind: ScaleIn
    metadata:
      name: scalein-workers
    spec:
      selector:
        name: elastic-training
      toDelete:
        count: 1
    
    

    如果想要缩容特定的 Worker,可以配置 podNames:

    apiVersion: kai.alibabacloud.com/v1alpha1
    kind: ScaleIn
    metadata:
      name: scalein-workers
    spec:
      selector:
        name: elastic-training
      toDelete:
        podNames:
        - elastic-training-worker-1
    
    

    运行一个缩容示例,指定数量缩容 1 个 worker:

    kubectl create -f examples/scale_in_count.yaml
    
    

    6.2 Reconcile

    当下发一个 scaleInCR,Controller 触发 Reconcile。主要就是调用 setScalingOwner。

    func (r *ScaleInReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
    	//silog := r.Log.WithValues("scalein", req.NamespacedName)
    	scaleIn, err := getScaleIn(req.NamespacedName, r.Client)
    
    	if isScaleFinished(*scaleIn.GetJobStatus()) {
    		return NoRequeue()
    	}
    
        // 以上基本都是各种校验
    	return setScalingOwner(r, scaleIn, r.PollInterval)
    }
    
    

    6.3 setScalingOwner

    setScalingOwner 是关键之一。

    这里主要是处理当 ScaleIn CR 没有设置 OwnerReferences 的情况,就设置一个。

    逻辑是 根据 ScaleIn CR 中的 Selector 字段,找到 Scaler 对应的 TrainingJob,设置到 CR 的 OwnerReferences 上。

    下面移除各种错误检查代码。

    func setScalingOwner(r client.Client, scaler Scaler, pollInterval time.Duration) (ctrl.Result, error) {
    	ownerRefs := scaler.GetOwnerReferences()
    	if len(ownerRefs) == 0 {
    		trainingJob := &kaiv1alpha1.TrainingJob{}
    		nsn := types.NamespacedName{}
    		nsn.Namespace = scaler.GetNamespace()
    		nsn.Name = scaler.GetSelector().Name
    		err := r.Get(context.Background(), nsn, trainingJob)
    
    		gvk := kaiv1alpha1.SchemeGroupVersionKind
    		ownerRefs = append(ownerRefs, *metav1.NewControllerRef(trainingJob, schema.GroupVersionKind{Group: gvk.Group, Version: gvk.Version, Kind: gvk.Kind}))
    		scaler.SetOwnerReferences(ownerRefs)
    
    		initializeJobStatus(scaler.GetJobStatus())
    		updateJobConditions(scaler.GetJobStatus(), v1.JobCreated, "", msg)
    		err = r.Status().Update(context.Background(), scaler)
    		err = r.Update(context.Background(), scaler)
    	}
    	return NoRequeue()
    }
    
    

    6.4 executeScaleIn

    JobRunning 状态处理与 ScaleOut类似,所以略过,直接看处理executeScaleIn。

    执行缩容时,可以通过 ScaleIn CR 中的 spec.toDelete.count 或 spec.toDelete.podNames 字段指定缩容的 worker。

    通过 count 配置缩容的数量,则通过 index 计算由高到低缩容 Worker。

    具体结合代码就是:

    setsSaleInToDelete 指定哪些需要删除;

    executeScaleScript 执行脚本;

    DeleteWorkers 删除 worker;

    func (r *TrainingJobReconciler) executeScaleIn(job *kaiv1alpha1.TrainingJob, scaleIn *kaiv1alpha1.ScaleIn) error {
    	if scaleIn.DeletionTimestamp != nil || isScaleFinished(*scaleIn.GetJobStatus()) {
    		logger.Info("reconcile cancelled, scalein does not need to do reconcile or has been deleted")
    		return nil
    	}
    
    	initializeJobStatus(scaleIn.GetJobStatus())
    
    	//TODO: Validate the scalein count for minSize
    	err := r.setsSaleInToDelete(job, scaleIn)
    
    	currentWorkers := r.workersAfterScaler(job.Status.CurrentWorkers, scaleIn)
    
    	// execute scalein script
    	if err := r.executeScaleScript(job, scaleIn, currentWorkers); err != nil {
    		msg := fmt.Sprintf("%s execute script failed, error: %v", scaleIn.GetFullName(), err)
    		r.updateScalerFailed(scaleIn, job, msg)
    		return nil
    	}
    
    	toDeleteWorkers := scaleIn.GetPodNames()
    	remainWorkers := false
    	if scaleIn.Spec.Script == "" {
    		if shutdownWorkers, err := r.checkWorkerShutdown(job, toDeleteWorkers); err != nil {
    			return err
    		} else {
    			if len(toDeleteWorkers) != len(shutdownWorkers) {
    				remainWorkers = true
    				toDeleteWorkers = shutdownWorkers
    			}
    		}
    	}
    	if err := r.DeleteWorkers(job, toDeleteWorkers); err != nil {
    		msg := fmt.Sprintf("%s delete resource failed, error: %v", scaleIn.GetFullName(), err)
    		r.updateScalerFailed(scaleIn, job, msg)
    		return nil
    	}
    
    	// wait pods deleted
    	deleted, _ := r.isWorkersDeleted(job.Namespace, scaleIn.GetPodNames())
    	if deleted {
    		job.Status.TargetWorkers = r.workersAfterScaler(job.Status.TargetWorkers, scaleIn)
    		job.Status.CurrentWorkers = currentWorkers
    		r.updateScalerSuccessd(scaleIn, job)
    		return nil
    	}
    
    	if remainWorkers {
    		msg := "wait for workers process shutdown"
    		logger.Info(msg)
    		return NewRequeueError(fmt.Errorf(msg))
    	}
    
    	return nil
    }
    
    

    6.5 setsSaleInToDelete

    通过 ScaleIn CR 中的 spec.toDelete.count 或 spec.toDelete.podNames 字段指定缩容的 worker。

    func (r *TrainingJobReconciler) setsSaleInToDelete(job *kaiv1alpha1.TrainingJob, scaleIn *kaiv1alpha1.ScaleIn) error {
    	podNames := scaleIn.Status.ToDeletePods
    	if len(podNames) != 0 {
    		return /*filterPodNames(workers, podNames, false), */ nil
    	}
    	workers, err := r.GetWorkerPods(job)
    
    	toDelete := scaleIn.Spec.ToDelete
    
    	if toDelete.PodNames != nil {
    		workers = filterPodNames(workers, toDelete.PodNames, false)
    	} else if toDelete.Count > 0 {
    		if toDelete.Count < len(workers) {
    			allPodNames := getSortPodNames(job.Name, workers)
    			deletePodNames := allPodNames[len(workers)-toDelete.Count:]
    			workers = filterPodNames(workers, deletePodNames, false)
    		} 
    	} 
      
    	for _, worker := range workers {
    		scaleIn.Status.ToDeletePods = append(scaleIn.Status.ToDeletePods, worker.Name)
    	}
    
    	return nil
    }
    
    
    

    6.6 DeleteWorkers

    具体删除worker service 和 pods。

    func (r *TrainingJobReconciler) DeleteWorkers(trainingJob *kaiv1alpha1.TrainingJob, workers []string) error {
    	if err := r.DeleteWorkerServices(trainingJob, workers); err != nil {
    		return fmt.Errorf("delete services failed: %++v", err)
    	}
    
    	if err := r.DeleteWorkerPods(trainingJob, workers); err != nil {
    		return fmt.Errorf("delete pods failed: %++v", err)
    	}
    	return nil
    }
    
    

    6.7 DeleteWorkerPods

    删除pods。

    func (r *TrainingJobReconciler) DeleteWorkerPods(job *kaiv1alpha1.TrainingJob, pods []string) error {
    	workerPods, err := r.GetWorkerPods(job)
    
    	if pods != nil {
    		workerPods = filterPodNames(workerPods, pods, false)
    	}
    	for _, pod := range workerPods {
    		deleteOptions := &client.DeleteOptions{GracePeriodSeconds: utilpointer.Int64Ptr(0)}
    		if err := r.Delete(context.Background(), &pod, deleteOptions); err != nil && !errors.IsNotFound(err) {
    			r.recorder.Eventf(job, corev1.EventTypeWarning, trainingJobFailedReason, "Error deleting worker %s: %v", pod.Name, err)
    			//return err
    		}
    		r.recorder.Eventf(job, corev1.EventTypeNormal, trainingJobSucceededReason, "Deleted pod %s", pod.Name)
    	}
    	return nil
    }
    
    

    具体逻辑如下:

          1 Request("")
     K8S-----------------> Reconcile  <------------------+
          2 ScaleOut CR        +                         |
     K8S----------------->     |                         |
                               |                         |
                               v                         |
        +----------------------+---------------------+   |
        |                 ReconcileJobs              |   |
        |                      +                     |   |
        |                      |                     |   |
        |        +------------------------------+    |   |
        |     1  |             | 2            3 |    |   |
        |        v             v                v    |   | 3
        |  "", JobCreated   JobRunning      Scaling +---------> executeScaling -----+
        +--------+-------------+---------------------+   |          +               |
                 |             |                         |          |               |
              1  |             | 2                       |          | 3             | 4
                 v             v                         |          v               v
         reconcileResource   reconcileJobRunning         |    executeScaleOut  executeScaleIn
                 +             +                         |          +               +
              1  |             | 2                       |          |               |
                 |             |                         |          | 3             | 4
                 v             v                         |          v               v
    +------------+--------+  setTrainingJobScaler        | executeScaleScript executeScaleScript
    | doSteps             |    +                         |          +               +
    |                     |    | 2                       |          |               |
    |                     |    |                         |          | 3             | 4
    |    WorkersCreated   |    v                         |          v               v
    |                     |  updateScalerState           | hostfileUpdateScript  DeleteWorkers
    |                     |    +                         |          +               +
    |    WorkersReady     |    |                         |          | 3             | 4
    |                     |    | 2                       |          |               |
    |                     |    v                         |          v               v
    |    LauncherCreated  |  common.Scaling              |   executeOnLauncher  DeleteWorkerPods
    |                     |    +                         |          +               +
    |                     |    |                         |          |               |
    |    JobRunning       |    | 2                       |          | 3             | 4
    |                     |    |                         |          v               v
    +---------------------+    +-------------------------+     kubectlOnPod      Delete
    
    
    

    至此,Horovod系列分析完毕,下一篇开始分析参数服务器,敬请期待。

    0xEE 个人信息

    ★★★★★★关于生活和技术的思考★★★★★★

    微信公众账号:罗西的思考

    如果您想及时得到个人撰写文章的消息推送,或者想看看个人推荐的技术资料,敬请关注。

    在这里插入图片描述

    0xFF 参考

    ElasticDL 分析

    在 Kubernetes 上弹性深度学习训练利器 -- Elastic Training Operator

  • 相关阅读:
    数论学习笔记之欧拉函数
    [CQOI2014]危桥
    lspci -nnk
    linux 详解useradd 命令基本用法
    。 (有些情况下通过 lsof(8) 或 fuser(1) 可以 找到有关使用该设备的进程的有用信息)
    CentOS 7 设置默认进入字符界面
    下面附上top和sar的使用方法,方便参考! "top"工具
    Centos7/RHEL7 开启kdump
    Linux内存带宽的一些测试笔记
    调试测试
  • 原文地址:https://www.cnblogs.com/rossiXYZ/p/15032691.html
Copyright © 2011-2022 走看看