zoukankan      html  css  js  c++  java
  • 解锁云原生 AI 技能

     

    按照上篇文章《解锁云原生 AI 技能 | 在 Kubernetes 上构建机器学习系统》搭建了一套 Kubeflow Pipelines 之后,我们一起小试牛刀,用一个真实的案例,学习如何开发一套基于 Kubeflow Pipelines 的机器学习工作流。

    准备工作

    机器学习工作流是一个任务驱动的流程,同时也是数据驱动的流程,这里涉及到数据的导入和准备、模型训练 Checkpoint 的导出评估、到最终模型的导出。这就需要分布式存储作为传输的媒介,此处使用 NAS 作为分布式存储。

    • 创建分布式存储,这里以 NAS 为例。此处 NFS_SERVER_IP 需要替换成真实 NAS 服务器地址
    1. 创建阿里云 NAS 服务,可以参考文档
    2. 需要在 NFS Server 中创建 /data
    # mkdir -p /nfs
    # mount -t nfs -o vers=4.0 NFS_SERVER_IP:/ /nfs
    # mkdir -p /data
    # cd /
    # umount /nfs
    1. 创建对应的 Persistent Volume
    # cat nfs-pv.yaml
    apiVersion: v1
    kind: PersistentVolume
    metadata:
      name: user-susan
      labels:
        user-susan: pipelines
    spec:
      persistentVolumeReclaimPolicy: Retain
      capacity:
        storage: 10Gi
      accessModes:
      - ReadWriteMany
      nfs:
        server: NFS_SERVER_IP
        path: "/data"
        
    # kubectl create -f nfs-pv.yaml
    创建 Persistent Volume Claim
    # cat nfs-pvc.yaml
    apiVersion: v1
    kind: PersistentVolumeClaim
    metadata:
      name: user-susan
      annotations:
        description: "this is the mnist demo"
        owner: Tom
    spec:
      accessModes:
        - ReadWriteMany
      resources:
        requests:
           storage: 5Gi
      selector:
        matchLabels:
          user-susan: pipelines
    # kubectl create -f nfs-pvc.yaml

    开发 Pipeline

    由于 Kubeflow Pipelines 提供的例子都是依赖于 Google 的存储服务,这导致国内的用户无法真正体验 Pipelines 的能力。为此,阿里云容器服务团队提供了基于 NAS 存储训练 MNIST 模型的例子,方便您在阿里云上使用和学习 Kubeflow Pipelines。具体步骤分 3 步: 

    • (1) 下载数据 
    • (2) 利用 TensorFlow 进行模型训练 
    • (3) 模型导出

    在这 3 个步骤中,后一个步骤都依赖于前一个步骤而完成。

    Kubeflow Pipelines 中可以用 Python 代码描述这样一个流程, 完整代码可以查看 standalone_pipeline.py

    我们在例子中使用了基于开源项目 Arena 的 arena_op ,这是对于 Kubeflow 默认的 container_op 封装,它能够实现对于分布式训练 MPI 和 PS 模式的无缝衔接,另外也支持使用 GPU 和 RDMA 等异构设备和分布式存储的简单接入,同时方便从 git 源同步代码,是一个比较实用的工具 API。 

    @dsl.pipeline(
      name='pipeline to run jobs',
      description='shows how to run pipeline jobs.'
    )
    def sample_pipeline(learning_rate='0.01',
        dropout='0.9',
        model_version='1',
        commit='f097575656f927d86d99dd64931042e1a9003cb2'):
      """A pipeline for end to end machine learning workflow."""
      data=["user-susan:/training"]
      gpus=1
    # 1. prepare data
      prepare_data = arena.standalone_job_op(
        name="prepare-data",
        image="byrnedo/alpine-curl",
        data=data,
        command="mkdir -p /training/dataset/mnist && 
      cd /training/dataset/mnist && 
      curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-images-idx3-ubyte.gz && 
      curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-labels-idx1-ubyte.gz && 
      curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-images-idx3-ubyte.gz && 
      curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-labels-idx1-ubyte.gz")
      # 2. downalod source code and train the models
      train = arena.standalone_job_op(
        name="train",
        image="tensorflow/tensorflow:1.11.0-gpu-py3",
        sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
        env=["GIT_SYNC_REV=%s" % (commit)],
        gpus=gpus,
        data=data,
        command='''
        echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/main.py 
        --max_steps 500 --data_dir /training/dataset/mnist 
        --log_dir /training/output/mnist  --learning_rate %s 
        --dropout %s''' % (prepare_data.output, learning_rate, dropout),
        metrics=["Train-accuracy:PERCENTAGE"])
      # 3. export the model
      export_model = arena.standalone_job_op(
        name="export-model",
        image="tensorflow/tensorflow:1.11.0-py3",
        sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
        env=["GIT_SYNC_REV=%s" % (commit)],
        data=data,
        command="echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/export_model.py --model_version=%s --checkpoint_path=/training/output/mnist /training/output/models" % (train.output, model_version))

    Kubeflow Pipelines 会将上面的代码转化成一个有向无环图 (DAG), 其中的每一个节点就是 Component (组件),而 Component (组件)之间的连线代表它们之间的依赖关系。从 Pipelines UI 可以看到 DAG 图:

    首先具体理解一下数据准备的部分,这里我们提供了 arena.standalone_job_op 的 Python API,  需要指定该步骤的名称: name; 需要使用的容器镜像: image; 要使用的数据以及其对应到容器内部的挂载目录: data。

    这里的 data 是一个数组格式, 如 data=["user-susan:/training"],表示可以挂载到多个数据。 其中 user-susan 是之前创建的 Persistent Volume Claim, 而 /training 为容器内部的挂载目录。

    prepare_data = arena.standalone_job_op(
        name="prepare-data",
        image="byrnedo/alpine-curl",
        data=data,
        command="mkdir -p /training/dataset/mnist && 
      cd /training/dataset/mnist && 
      curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-images-idx3-ubyte.gz && 
      curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-labels-idx1-ubyte.gz && 
      curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-images-idx3-ubyte.gz && 
      curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-labels-idx1-ubyte.gz")

    而上述步骤实际上是从指定地址利用 curl 下载数据到分布式存储对应的目录 /training/dataset/mnist,请注意这里的 /training 为分布式存储的根目录,类似大家熟悉的根 mount 点;而 /training/dataset/mnist 是子目录。其实后面的步骤可以通过使用同样的根 mount 点,读到数据,进行运算。

    第二步是利用下载到分布式存储的数据,并通过 git 指定固定 commit id 下载代码,并进行模型训练。

    train = arena.standalone_job_op(
        name="train",
        image="tensorflow/tensorflow:1.11.0-gpu-py3",
        sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
        env=["GIT_SYNC_REV=%s" % (commit)],
        gpus=gpus,
        data=data,
        command='''
        echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/main.py 
        --max_steps 500 --data_dir /training/dataset/mnist 
        --log_dir /training/output/mnist  --learning_rate %s 
        --dropout %s''' % (prepare_data.output, learning_rate, dropout),
        metrics=["Train-accuracy:PERCENTAGE"])

    可以看到这个步骤比数据准备要相对复杂一点,除了和第一步骤中的 name, image,  data 和 command 一样需要指定之外,在模型训练步骤中,还需要指定:

    • 获取代码的方式: 从可重现实验的角度来看,对于运行试验代码的追本溯源,是非常重要的一环。可以在 API 调用时指定 sync_source 的 git 代码源,同时通过设定 env 中 GIT_SYNC_REV 指定训练代码的 commit id;
    • gpu:  默认为 0,就是不使用 GPU;如果为大于 0 的整数值,就代表该步骤需要这个数量的 GPU 数;
    • metrics:  同样是从可重现和可比较的实验目的出发,用户可以将需要的一系列指标导出,并且通过 Pipelines UI 进行直观的显示和比较。具体使用方法分为两步:1. 在调用 API 时以数组的形式指定要收集指标的 metrics name 和指标的展示格式 PERCENTAGE 或者是 RAW,比如 metrics=["Train-accuracy:PERCENTAGE"]。 2. 由于 Pipelines 默认会从 stdout 日志中收集指标,你需要在真正运行的模型代码中输出 {metrics name}={value} 或者 {metrics name}:{value}, 可以参考具体样例代码

    值得注意的是:

    在本步骤中指定了和 prepare_data 相同的 data 参数 ["user-susan:/training"],就可以在训练代码中读到对应的数据,比如 --data_dir /training/dataset/mnist

    另外由于该步骤依赖于 prepare_data,可以在方法中通过指定 prepare_data.output 表示两个步骤的依赖关系。

    最后 export_model 是基于 train 训练产生的 checkpoint,生成训练模型:

    export_model = arena.standalone_job_op(
        name="export-model",
        image="tensorflow/tensorflow:1.11.0-py3",
        sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
        env=["GIT_SYNC_REV=%s" % (commit)],
        data=data,
        command="echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/export_model.py --model_version=%s --checkpoint_path=/training/output/mnist /training/output/models" % (train.output, model_version))

    export_model 和第二步 train 类似,甚至要更为简单,它只是从 git 同步模型导出代码并且利用共享目录 /training/output/mnist 中的 checkpoint 执行模型导出。

    整个工作流程看起来还是很直观的, 下面就可以定义一个 Python 方法将整个流程贯穿在一起:

    @dsl.pipeline(
      name='pipeline to run jobs',
      description='shows how to run pipeline jobs.'
    )
    def sample_pipeline(learning_rate='0.01',
        dropout='0.9',
        model_version='1',
        commit='f097575656f927d86d99dd64931042e1a9003cb2'):

    @dsl.pipeline 是表示工作流的装饰器,这个装饰器中需要定义两个属性,分别是 name 和  description

    入口方法 sample_pipeline 中定义了 4 个参数: learning_ratedropoutmodel_version 和 commit, 分别可以在上面的 train 和 export_model 阶段使用。这里的参数的值实际上是  dsl.PipelineParam 类型,定义成 dsl.PipelineParam 的目的在于可以通过 Kubeflow Pipelines 的原生 UI 将其转换成输入表单,表单的关键字是参数名称,而默认值为参数的值。值得注意的是,这里的 dsl.PipelineParam 对应值实际上只能是字符串和数字型;而数组和 map,以及自定义类型都是无法通过转型进行变换的。

    实际上,这些参数都可以在用户提交工作流时进行覆盖,以下就是提交工作流对应的 UI:

    提交 Pipeline

    您可以在自己的 Kubernetes 内将前面开发工作流的 Python DSL 提交到 Kubeflow Pipelines 服务中, 实际提交代码很简单:

    KFP_SERVICE="ml-pipeline.kubeflow.svc.cluster.local:8888"
      import kfp.compiler as compiler
      compiler.Compiler().compile(sample_pipeline, __file__ + '.tar.gz')
      client = kfp.Client(host=KFP_SERVICE)
      try:
        experiment_id = client.get_experiment(experiment_name=EXPERIMENT_NAME).id
      except:
        experiment_id = client.create_experiment(EXPERIMENT_NAME).id
      run = client.run_pipeline(experiment_id, RUN_ID, __file__ + '.tar.gz',
                                params={'learning_rate':learning_rate,
                                         'dropout':dropout,
                                        'model_version':model_version,
                                        'commit':commit})

    利用 compiler.compile 将 Python 代码编译成执行引擎 (Argo) 识别的 DAG 配置文件;

    通过 Kubeflow Pipeline 的客户端创建或者找到已有的实验,并且提交之前编译出的 DAG 配置文件。

    在集群内准备一个 python3 的环境,并且安装 Kubeflow Pipelines SDK:

    # kubectl create job pipeline-client --namespace kubeflow --image python:3 -- sleep infinity
    # kubectl  exec -it -n kubeflow $(kubectl get po -l job-name=pipeline-client -n kubeflow | grep -v NAME| awk '{print $1}') bash

    登录到 Python3 的环境后,执行如下命令,连续提交两个不同参数的任务:

    # pip3 install http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp/0.1.14/kfp.tar.gz --upgrade
    # pip3 install http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp-arena/kfp-arena-0.4.tar.gz --upgrade
    # curl -O https://raw.githubusercontent.com/cheyang/pipelines/update_standalone_sample/samples/arena-samples/standalonejob/standalone_pipeline.py
    # python3 standalone_pipeline.py --learning_rate 0.0001 --dropout 0.8 --model_version 2
    # python3 standalone_pipeline.py --learning_rate 0.0005 --dropout 0.8 --model_version 3

    查看运行结果

    登录到 Kubeflow Pipelines 的 UI: https://{pipeline地址}/pipeline/#/experiments, 比如:

    https://11.124.285.171/pipeline/#/experiments

    点击 Compare runs 按钮,可以比较两个实验的输入、花费的时间和精度等一系列指标。让实验可追溯是让实验可重现的第一步,而利用 Kubeflow Pipelines 本身的实验管理能力则是开启实验可重现的第一步。

    总结

    实现一个可以运行的 Kubeflow Pipeline 需要的步骤是:

    1. 构建 Pipeline (流水线)中需要的最小执行单元 Component (组件),如果是利用原生定义的 dsl.container_ops, 需要构建两部分代码:
    • 构建运行时代码:通常是为每个步骤构建容器镜像,作为 Pipelines 和真正执行业务逻辑代码之间的适配器。它所做的事情为获取 Pipelines 上下文的输入参数,调用业务逻辑代码,并且将需要传递到下个步骤的输出按照 Pipelines 的规则放到容器内的指定位置,由底层工作流组件负责传递。 这样产生的结果是运行时代码与业务逻辑代码会耦合在一起。可以参考 Kubeflow Pipelines 的例子
    • 构建客户端代码:这个步骤通常是长成下面的样子, 熟悉 Kubernetes 的朋友会发现这个步骤实际上就是在编写 Pod Spec:
    container_op = dsl.ContainerOp(
            name=name,
            image='<train-image>',
            arguments=[
                '--input_dir', input_dir,
                '--output_dir', output_dir,
                '--model_name', model_name,
                '--model_version', model_version,
                '--epochs', epochs
            ],
            file_outputs={'output': '/output.txt'}
        )
    container_op.add_volume(k8s_client.V1Volume(
                host_path=k8s_client.V1HostPathVolumeSource(
                    path=persistent_volume_path),
                name=persistent_volume_name))
    container_op.add_volume_mount(k8s_client.V1VolumeMount(
                mount_path=persistent_volume_path,
                name=persistent_volume_name))

    利用原生定义的 dsl.container_ops 的好处在于灵活,由于开放了和 Pipelines 的交互接口,用户可以在 container_ops 这个层面做许多事情。但是它的问题在于:

    • 复用度低。每个 Component 都需要构建镜像和开发运行时代码;
    • 复杂度高。使用者需要了解 Kubernetes 的概念,比如 resource limit,  PVC,  node selector 等一系列概念;
    • 支持分布式训练困难。由于 container_op 为单容器操作,如果需要支持分布式训练就需要在 container_ops 中提交和管理类似 TFJob 的任务。这里会带来复杂度和安全性的双重挑战,复杂度比较好理解,安全性是说提交 TFJob 这类任务的权限会需要开放额外的权限给 Pipeline 的开发者。

    另一种方式是使用 arena_op 这种可以重用的 Component API,它使用通用运行时代码,可以免去重复构建运行时代码的工作;同时利用通用一套的 arena_op API 简化用户的使用;也支持 Parameter Server 和 MPI 等场景。建议您使用这种方式编译 Pipelines。

    1. 将构建好的 Component (组件)拼接成 Pipeline (流水线);
    2. 将 Pipeline (流水线)编译成 Argo 的执行引擎 (Argo) 识别的 DAG 配置文件, 并提交 DAG 配置文件到 Kubeflow Pipelines,  利用 Kubeflow Pipelines 自身的 UI 查看流程结果。
  • 相关阅读:
    高斯消元学习
    HDU 4596 Yet another end of the world(解一阶不定方程)
    Codeforces Round #318 div2
    HDU 4463 Outlets(一条边固定的最小生成树)
    HDU 4458 Shoot the Airplane(计算几何 判断点是否在n边形内)
    HDU 4112 Break the Chocolate(简单的数学推导)
    HDU 4111 Alice and Bob (博弈)
    POJ 2481 Cows(线段树单点更新)
    HDU 4288 Coder(STL水过)
    zoj 2563 Long Dominoes
  • 原文地址:https://www.cnblogs.com/alisystemsoftware/p/11271924.html
Copyright © 2011-2022 走看看