zoukankan      html  css  js  c++  java
  • X-Deep Learning功能模块

    X-Deep Learning功能模块

    1. 特征体系

    1.1 样本

    样本由 label, feature, sampleid 三部分组成;训练过程是通过feature学习label的过程,sample id是一条样本的标识。label可以是单值(二分类)或者多值(多分类)。

    1.2 特征

    传统的特征一般是稠密的(dense),维度不会特别高,样本都出现;以向量的形式表示;例如图像特征,或者用户的性别信息。
    对于搜索-推荐-广告场景,存在大量的稀疏特征(sparse):这些特征维度高(百亿),但是样本中出现次数低(数百),这类特征以多个kv的方式稀疏表示;例如商品类目(key是类目的id,没有value),用户点击过的商品列表(key是商品id,value是点击次数)。

    特征以特征组的方式组织,便于构建训练网络;例如特征分为:用户性别(稠密特征),用户偏好(稀疏特征),商品类目(稀疏特征),商品价格(稠密特征);

    1.3 网络

    简单的稀疏场景训练网络

    2. 数据准备

    2.1 样本格式

    2.1.1 文本格式

    文本格式一行表示一条样本,分为多个字段,用'|'分隔 字段定义和分隔符如下:

     

    7859345_420968_1007|user_3423487|clk_14@32490:1.0,32988:2.0;prefer@323423,32342|qscore@0.8,0.5;ad_price@33.8|0.0,1.0|1544094136

    2.1.2 protobuf格式

    简单样本格式用message定义一条样本,包括 sample_id, label, feature_line 三个部分; 文件中先以一个32位的整数记录样本长度,然后存储序列化后的样本;

    // 简单样本格式

    message SampleSimple {

        required string sample_id = 1;            // 样本的sample_id

        required Label label = 2;                 // 样本的label, Label类型

        required FeatureLine feature_line = 3;     // 样本的特征行

    }

    2.2 使用DataReader读取数据

    2.2.1 DataReader Api

    import xdl

     

    reader = xdl.DataReader("r1", # reader名称                                                                                                                                                                                              

                            paths=["./data.txt"], # 文件列表                                                                                                                                                                                    

                            enable_state=False) # 是否打开reader state,用于分布式failover,开启的时候需要额外的命令行参数(task_num)                                                                                                                                                                            

     

    reader.epochs(1).threads(1).batch_size(10).label_count(1)

    reader.feature(name='sparse0', type=xdl.features.sparse)  # 定义reader需要读取的特征,本例包括两个sparse特征组和一个dense特征组                                                                                                                                                                                   

        .feature(name='sparse1', type=xdl.features.sparse)

        .feature(name='deep0', type=xdl.features.dense, nvec=256)

    reader.startup()

    # ...

     

    2.2.2 选项设置

     

    2.2.3 结构化压缩

    结构化压缩是指,多个样本中共同的特征值,只存储一份,也只进行一次计算;需要在样本处理中,聚合多条具有共同特征的样本。

    例如广告样本中,一个用户可能点击N个广告,产生N条样本;这N条样本的广告特征簇都是不同的,但是用户特征簇都是相同的一份。

    使用结构化压缩,需要在pb样本中表达特征结构化,主要是通过定义多个特征表

    例如下图定义了一个特征主表(ad)和一个特征辅表(user),通过主表的多个ad特征指向辅表的一个user特征,表示多个样本的特征复用关系。

    a0 和 a1 特征共用一个u0特征;a2, a3, a4共用一个u1特征

    特征表的定义是一个repeated, 这样可以通过定义两个以上的特征表,来表示多层压缩。例如 图片 -> 广告 -> 用户 这样两层的多对一关系

    message SampleGroup {

        repeated string sample_ids = 1;            // 每个样本的sample_id

        repeated Label labels = 2;                 // 每个样本的label, Label类型

        repeated FeatureTable feature_tables = 3;  // 整个sample的特征表,如果没有辅表,只有一个feature_table

        repeated Extensions extensions = 5;        // 每个样本的扩展字段,待以后扩展

     

    }

     

    message Extensions {

        map<string, string> extension = 1;

    }

     

    // 标签,支持多目标训练

    message Label {

        repeated float values = 1;

    }

     

    // 特征表

    message FeatureTable {

        repeated FeatureLine feature_lines = 1; // 每个样本的特征行

    }

     

    // 特征行

    message FeatureLine {

        repeated Feature features = 1;         // 每个特征行里的特征(组)

        optional int32 refer = 2;              // 引用下层辅表的哪个特征行

    }

     

    // 特征(组)

    message Feature {

        required FeatureType type = 1;         // 特征类型

        optional string name = 2;              // 特征(组)名字,与field_id二选一

        repeated FeatureValue values = 3;      // 特征值, 一个特征(组)可能有多个特征值

    }

     

    // 特征值

    message FeatureValue {

       optional int64 key = 1;                 // 特征ID, dense可以没有

       optional float value = 2;               // 特征值,没有默认是1

       repeated float vector = 3;              // 特征向量,向量类型的特征才有,也可以用来表示稠密特征

       optional int64 hkey = 4;                // 特征ID高64位,用来支持128位hashkey

    }

    2.2.4 更多选项

     

    2.3 自定义python reader

    • xdl支持直接使用python定义op

    from tensorflow.examples.tutorials.mnist import input_data

     

    mnist_data = input_data.read_data_sets('./data')

     

    # python读取函数,直接使用tf封装好的api读取mnist数据

    def read_data(batch_size=100):                                                                                                                                                                                                               

        global mnist_data                                                                                                                                                                                                                         

        images, labels = mnist_data.train.next_batch(batch_size)                                                                                                                                                                                  

        labels = np.asarray(labels, np.float32)                                                                                                                                                                                                   

        return images, labels     

     

    # 通过xdl.py_func定义op

    images, labels = xdl.py_func(read_data, [], output_type=[np.float32, np.float32])       

     

    3. 定义模型

    XDL专注解决搜索广告等稀疏场景的模型训练性能问题,因此将模型计算分为稀疏和稠密两部分,稀疏部分通过参数服务器,GPU加速,参数合并等技术极大提升了稀疏特征的计算和通信性能。稠密部分采用多backend设计,支持TF和Mxnet两个引擎作为计算后端,并且可以使用原生TF和Mxnet API定义模型。下面分别介绍稀疏和稠密部分的API

    3.1 稀疏API

    • API列表
    • 参数说明
    • xdl.embedding

     

     xdl.merged_embedding

     

    3.2 稠密API

    XDL使用TF和Mxnet作为计算后端,并且支持使用TF和Mxnet原生API来定义模型

    • 定义方法
        1. 使用TensorFlow或者Mxnet定义模型
        1. 使用xdl.tf_wrapper或者xdl.mxnet_wrapper修饰模型定义函数
    • 装饰器参数
    • 使用TF Backend定义一个embedding + 5层dense网络

    @xdl.tf_wrapper()

    def model_fn(dense, embeddings, labels):

      input_features = [dense]

      input_features.extend(embeddings)

      inputs = tf.concat(input_features, 1)

      fc1 = tf.layers.dense(inputs, 256, activation=tf.nn.relu)

      fc2 = tf.layers.dense(fc1, 128, activation=tf.nn.relu)

      fc3 = tf.layers.dense(fc2, 64, activation=tf.nn.relu)

      fc4 = tf.layers.dense(fc3, 32, activation=tf.nn.relu)

      logits = tf.layers.dense(fc4, 1, activation=tf.nn.relu)

      cross_entropy = tf.nn.sigmoid_cross_entropy_with_logits(labels=labels, logits=logits)

      loss = tf.reduce_mean(cross_entropy)

      return loss

    • 使用Mxnet Backend定义一个embedding + 5层dense网络

    @xdl.mxnet_wrapper()

    def model_fn(dense, embeddings, label):

      input_features = [dense]

      input_features.extend(embeddings)

      inputs = mx.symbol.concat(*input_features, dim=1)

      fc1 = mx.sym.FullyConnected(data=inputs, num_hidden=256, name='fc1')

      fc2 = mx.sym.FullyConnected(data=fc1, num_hidden=128, name='fc2')

      fc3 = mx.sym.FullyConnected(data=fc2, num_hidden=64, name='fc3')

      fc4 = mx.sym.FullyConnected(data=fc3, num_hidden=32, name='fc4')

      fc5 = mx.sym.FullyConnected(data=fc4, num_hidden=1, name='fc5')

      prop = mx.symbol.SoftmaxOutput(data=fc5, label=label)

      loss = - mx.symbol.sum(mx.symbol.log(prop) * label) / 4

      return loss

    3.3 优化器

    • XDL支持常用的optimizer,包括
      • SGD
      • Momentum
      • Agagrad
      • Adam
      • Ftrl
    • 使用方法

    optimizer = xdl.SGD(0.5)

    train_op = optimizer.optimize()

    sess = xdl.TrainSession()

    sess.run(train_op)

    4. 训练模型

    XDL支持单机及分布式两种训练模式,单机模式一般用来做早期模型的调试和正确性验证,为了充分发挥XDL的稀疏计算能力,建议使用分布式模式进行大规模并行训练

    4.1 单机训练

    XDL通过Local PS的方式支持单机训练,只需运行时给python脚本加上--run_mode=local的命令行参数即可:

    python test.py --run_mode=local

    如果用户需要使用XDL镜像进行单机训练,则需要先以bash session形式进入镜像,再启动命令:

    sudo docker run -it xdl/xdl:tag /bin/bash

    python test.py --run_mode=local

    4.2 分布式训练

    • XDL通过ams存储参数,从而支持了分布式训练,在进行分布式训练时,需要启动ams进程和worker进程,ams进程包括一个scheduler和多个server,用户可以通过手动方式启动,也可以使用XDL提供的基于yarn+docker的分布式调度工具xdl_sumbit启动

    4.2.1 通过手工方式启动分布式任务

    以下命令都默认宿主机上具有XDL运行环境,或用户以bash session形式进入XDL镜像。

    • 启动ams-scheudler

    # 参数解释:

      ps_cpu_cores和ps_memory_m:分给ams-server的cpu和内存资源,将会影响ams的参数分配算法

      ckpt_dir:checkpoint地址,目前支持本地和HDFS两种文件系统

     

    python test.py --task_name=scheduler --zk_addr=zfs://xxx --ps_num=2 --ps_cpu_cores=10 --ps_memory_m=4000 --ckpt_dir=hdfs://xxx/checkpoint

    • 启动ams-server

    python test.py --task_name=ps --task_index=0 --zk_addr=zfs://[zk_server_ip:port]

    python test.py --task_name=ps --task_index=1 --zk_addr=zfs://[zk_server_ip:port]

    • 启动worker

    python test.py --task_name=worker --task_index=0 --task_num=4 --zk_addr=zfs://[zk_server_ip:port]

    python test.py --task_name=worker --task_index=1 --task_num=4 --zk_addr=zfs://[zk_server_ip:port]

    python test.py --task_name=worker --task_index=2 --task_num=4 --zk_addr=zfs://[zk_server_ip:port]

    python test.py --task_name=worker --task_index=3 --task_num=4 --zk_addr=zfs://[zk_server_ip:port]

    4.2.2 通过xdl_submit启动分布式任务

    • 使用xdl_submit需要在机器上提前部署相关环境,部署方法参见集群部署
    • xdl_submit任务基础配置示例

    {

      "job_name": "xdl_test",

      "dependent_dirs": "/home/xdl_user/xdl_test/",

      "script": "test.py",

      "docker_image": "registry.cn-hangzhou.aliyuncs.com/xdl/xdl:ubuntu-cpu-tf1.12",

      "worker": {

        "instance_num": 10,

        "cpu_cores": 4,

        "gpu_cores": 0,

        "memory_m": 1000

      },

      "ps": {

        "instance_num": 2,

        "cpu_cores": 2,

        "gpu_cores": 0,

        "memory_m": 1000

      },

      "checkpoint": {

        "output_dir": "hdfs://ns1/data/xdl_user/xdl_test/checkpoint"

      }

    }

    • 基础配置项说明 (带*号为必要配置项)
    • xdl_submit任务高级配置示例

    {

      "job_name": "xdl_test",

      "dependent_dirs": "/home/xdl_user/xdl_test/",

      "script": "test.py",

      "docker_image": "registry.cn-hangzhou.aliyuncs.com/xdl/xdl:ubuntu-cpu-tf1.12",

      "worker": {

        "instance_num": 10,

        "cpu_cores": 4,

        "gpu_cores": 0,

        "memory_m": 1000

      },

      "ps": {

        "instance_num": 2,

        "cpu_cores": 2,

        "gpu_cores": 0,

        "memory_m": 1000

      },

      "checkpoint": {

        "output_dir": "hdfs://ns1/data/xdl_user/xdl_test/checkpoint"

      },

      "scheduler_queue": "default",

      "min_finish_worker_rate": 90,

      "max_failover_times": 20,

      "max_local_failover_times": 3,

      "extend_role":{

        "ams": {

          "instance_num": 10,

          "cpu_cores": 8,

          "gpu_cores": 0,

          "memory_m": 8000,

          "script": "ams.py"

        }

      }

    }

    • 高级配置项说明
     
     
    • 提交任务
      • 将test.py放到/home/xdl_user/xdl_test/目录下,如果有其他脚本或者本地数据和配置也可以放到该目录下,xdl_submit会自动将其挂载到docker内
      • 执行命令: xdl_submit.py --config=xdl_test.json

    4.3 同步及半同步训练

    • 同步训练

    #创建session时,添加同步训练的hook

    hooks = []

    hooks.append(xdl.SyncRunHook(xdl.get_task_index(), xdl.get_task_num()))

    sess = xdl.TrainSession(hooks)

    while not sess.should_stop():

      sess.run(train_ops) 

    #sess run结束后,需要调用worker_report_finish_op

    xdl.worker_report_finish_op(np.array(xdl.get_task_index(),dtype=np.int32))

     

    • 半同步训练

    #创建session时,添加半同步训练的hook,staleness为不同worker间允许的最大差异step数,默认值为0

    hooks = []

    hooks.append(xdl.SemiSyncRunHook(xdl.get_task_index(), xdl.get_task_num(), staleness=0))

    sess = xdl.TrainSession(hooks)

    while not sess.should_stop():

      sess.run(train_ops) 

    #sess run结束后,需要调用worker_report_finish_op

    xdl.worker_report_finish_op(np.array(xdl.get_task_index(),dtype=np.int32))

    4.4 ID准入和ID退出

    • 基于概率的ID准入

    XDL在xdl.embedding接口中提供了基于概率的ID准入,使用方法如下:

    emb1 = xdl.embedding('emb1', batch['sparse0'], xdl.TruncatedNormal(stddev=0.001), 8, 1024, vtype='hash', feature_add_probability=0.9)

    feature_add_probability表示ID准入的概率,0表示哈希表完全不接受新的ID,1表示所有ID都会准入到哈希表。

    • 基于更新时间的ID退出

    如果某个ID长时间没被更新,那说明这个ID在模型中已经处于不太重要的地位,XDL提供了删除这些ID的功能,使用方法如下:

    emb1 = xdl.embedding('emb1', batch['sparse0'], xdl.TruncatedNormal(stddev=0.001), 8, 1024, vtype='hash')

    emb2 = xdl.embedding('emb2', batch['sparse1'], xdl.TruncatedNormal(stddev=0.001), 8, 1024, vtype='hash')

     

    hooks = []

     

    vars = ["emb1", "emb2"]

    mark_hook1 = xdl.GlobalStepMarkHook("emb1", batch["sparse0"].ids)

    mark_hook2 = xdl.GlobalStepMarkHook("emb2", batch["sparse1"].ids)

    hooks.append(mark_hook1)

    hooks.append(mark_hook2)

    if xdl.get_task_index() == 0:

       filter_hook = xdl.GlobalStepFilterHook(vars, 30, 10)

       hooks.append(filter_hook)

    方法说明:

    1. "emb1"和"emb2"是两路需要进行ID退出的特征,针对这两路特征创建两个GlobalStepMarkHook,具体创建方法参见上述代码;
    2. 选取一个worker(这里选择worker0),创建一个GlobalStepFilterHook,第一个参数"vars"为需要进行ID退出的变量名称集合,第二个参数"30"表示global_step每隔30步,进行一次ID退出的动作,第三个参数"10",表示如果某个ID超过10步没有被更新,在下一次的ID退出动作时,这个ID就会被删除。

    4.5 保存和恢复模型变量

    • 保存模型变量
    1. 通过Saver保存

    import xdl

    saver = xdl.Saver()

    checkpoint_version = "xxx" # checkpoint名称,一般是global step

    saver.save(version = checkpoint_version) 

    1. 通过CheckpointHook保存

    import xdl

    train_op = ...

    hook = xdl.CheckpointHook(save_interval_step=1000) # 每1000个global step保存一次

    sess = xdl.TrainSession(hooks=[hook])

    sess.run(train_op)

    • 恢复模型参数

    import xdl

    saver = xdl.Saver()

    checkpoint_version = "xxx"

    saver.restore(version = checkpoint_version)

    4.5 将二进制模型转为可读的文本格式

    import xdl

     

    variables=["x1", "x2"] # list of variable names

    xdl.convert_ps_variable(ckpt_dir="hdfs://absolute_path/to/checkpoint_dir",

      output_dir="hdfs://absolute_path/to/output_dir", var_list=variables)

    ckpt_dir既可以指定到checkpoint目录也可以指定到具体版本(ckpt......xxxx这一级)。指定到checkpoint目录会自动找到最新版本

    5. 模型评估

    模型评估是用指标反映模型在实际数据中的表现,是在训练中调整超参数,评估模型效果的重要依据。XDL提供了计算auc的默认op实现,用户也可以通过python或者c++定制自己的metrics实现

    import xdl

    saver = xdl.Saver()

    saver.restore(ckpt_version)

    labels = ...

    predictions = ...

    auc = xdl.auc(predictions, labels)

    sess = xdl.TrainSession()

    print sess.run(auc)

    6. 高层训练API:Estimator

    为了简化用户编写模型训练脚本的工作量,XDL提供了Estimator API,可以允许用户以一套代码执行训练/预测/评估/训练&评估等多种类型的任务

    使用步骤

    • 定义输入function

    # 定义train输入

    def input_fn():

        ...

        return feature_list, labels

     

    # 定义predict/evaluate输入

    def eval_input_fn():

        ...

        return test_feature_list, test_labels

    • 定义模型

    @xdl.tf_wrapper()

    def model_fn(feature_list, labels):

        logits = ...

        loss = ...

        return loss, logits

    • 创建Estimator

    estimator = xdl.Estimator(model_fn=model_fn, optimizer=xdl.SGD(0.5))

    • 进行train|evaluate|predict|train&&evaluate

    # 训练

    estimator.train(input_fn, max_step=2000, checkpoint_interval=1000)

     

    # 评估: checkpoint_version=""表示从最后一个checkpoint读取参数

    estimator.evaluate(eval_input_fn, checkpoint_version="", max_step=2000)

     

    # 预测

    estimator.predict(eval_input_fn, checkpoint_version="", max_step=2000)

     

    # 训练和评估交替执行

    estimator.train_and_evaluate(train_input_fn=input_fn,

                                 eval_input_fn=eval_input_fn,

                                 eval_interval=1000,

                                 eval_steps=200,

                                 checkpoint_interval=1000,

                                 max_step=5000)

    7. Timeline

    使用步骤

    • 在训练中产出timeline

    run_option = xdl.RunOption()                                                                                                                                                                                                              

    run_option.perf = True                                                                                                                                                                                                                    

    run_statistic = xdl.RunStatistic()                                                                                                                                                                                                        

    _ = sess.run(train_ops, run_option, run_statistic)                                                                                                                                                                                   

    xdl.Timeline(run_statistic.perf_result).save('./timeline.json')     

     

    • 在chrome中输入chrome://tracing,加载timeline.json

     

    人工智能芯片与自动驾驶
  • 相关阅读:
    CSS3 Animation
    css形状大全
    HTML5 表单属性
    HTML5 Input 类型
    HTML 5 服务器发送事件
    jq制作博客已存在多少天
    Java网络编程与NIO详解4:浅析NIO包中的Buffer、Channel 和 Selector
    Java网络编程和NIO详解3:IO模型与Java网络编程模型
    Java网络编程与NIO详解2:JAVA NIO 一步步构建IO多路复用的请求模型
    Java网络编程和NIO详解1:JAVA 中原生的 socket 通信机制
  • 原文地址:https://www.cnblogs.com/wujianming-110117/p/14394923.html
Copyright © 2011-2022 走看看