zoukankan      html  css  js  c++  java
  • 基于Spark和Tensorflow构建DCN模型进行CTR预测

    实验介绍

    数据采用Criteo Display Ads。这个数据一共11G,有13个integer features,26个categorical features。

    Spark

    由于数据比较大,且只在一个txt文件,处理前用split -l 400000 train.txt 对数据进行切分。

    连续型数据利用log进行变换,因为从实时训练的角度上来判断,一般的标准化方式,如Z-Score和最大最小标准化中用到的值都跟某一批数据的整体统计结果有关,换一批数据后标准化就程度就不一样了。

    而对于离散型分类数据,一般企业应该都会有类别表而不需要自己从数据中获取(这样能节省计算时间,而且流处理下只能针对特定批量或者时间段出现的数据进行数字编码,所以对超出该批量和时间的新类别就无法进行编码了)。虽然如此,如果在离线情况且真的需要自己从数据中提取类别并进行编码,比如现在这种情况,最直接的方法是使用ML模块的StringIndexer。这个工具方面使用,但是对于数据类别过多或者需要进行编码的列数量较多时容易出现OOM。通过StringIndexer的源码可以知道,它的实现是先利用rdd的countByValue得出特定列的统计map,然后出现频率最大的编码为0,第二的为1,如此类推。另外,它会copy这个map,而StringIndexer本身并没有提供删除这个map的方法,所以如果出现上述数据类别过多或者需要进行编码的列数量较多便会积累大量的map。而刚好这份数据有26种类别数据,且某些类别的种类居然能有三百多万种,所以只能另辟蹊径。下面的方法效仿StringIndexer的部分实现来达到目的,而且运行效率比之前有了很大的提升。当然,由于某些类别出现的频率很低,也可以采取一些cutoff措施,比如该用countByValue,只保留前n个类别,或者保留频率在某个数值以上的类别。

    下面实现考虑cutoff,出现次数少于万分之一的类别统一归类为UNK。

    val spark = SparkSession
       .builder()
       .master("local[*]")
    	// 这里的driver.memory和memory.fraction只做展示,实际使用中要在driver启动前设置才有效。即如果在idea中想增大driver的大小,这需要在VM option中设置堆大小。另外,local模式下设置提高driver大小即可,因为executor也是在同一个JVM进程中。
       .config("spark.driver.memory", 5G)
       .config("spark.sql.shuffle.partitions", 12)
       .config("spark.default.parallelism", 12)
       .config("spark.memory.fraction", 0.75)
       .getOrCreate()
    
    import org.apache.spark.sql.functions._
    val path = ""
    // 数据源是txt文件,但可以通过csv来推断格式
    val df = spark.read
      .option("header", false)
      .option("delimiter", "	")
      .option("inferSchema", true)
      .format("csv")
      .load(path + "..")
    // 如果内存够大,先把它全部加载到内存,减少IO
    df.persist(StorageLevel.MEMORY_AND_DISK_SER)
    val dataSize = df.count()
    val cutoff = dataSize * 0.0001
    
    val numCols = (1 until 14).map(i => s"_c$i").toArray
    
    var df1 = df
    numCols.foreach(column => {
      df1 = df1.withColumn(column, when(col(column).isNull, 0).otherwise(log(col(column) + 10)))
    })
    
    val catCols = (14 until 40).map(i => s"_c$i")
    var df2 = df1
    
    // 所有cat列统一编码
    // 使用java的map,通常java的集合比scala的更效率,而且java的hashmap能够初始化大小
    val inderMap: util.HashMap[String, util.HashMap[String, Int]] = new util.HashMap(catCols.length)
    var i = 0
    for (column <- catCols) {
      val uniqueElem = df2.select(column)
        .groupBy(column)
        .agg(count(column))
        .filter(col(s"count($column)") >= cutoff)
        .select(column)
        .map(_.getAs[String](0))
        .collect()
    
      val len = uniqueElem.length
      var index = 0
      val freqMap = new util.HashMap[String, Int](len)
    
      while (index < len){
        freqMap.put(uniqueElem(index), i)
        index += 1
        i += 1
      }
      freqMap.put("UNK", i)
      i += 1
      inderMap.put(column, freqMap)
    }
    
    val bcMap = spark.sparkContext.broadcast(inderMap)
    
    for (column <- catCols) {
      val Indexer = udf { elem: String =>
        val curMap = bcMap.value.get(column)
        if (elem == null || !curMap.containsKey(elem)) curMap.get("UNK")
        else curMap.get(elem)
      }
      df2 = df2.withColumn(column + "_e", Indexer(col(column))).drop(column)
    }
    // 如需要划分训练集和测试集
    val Array(train, test) = df2.randomSplit(Array(0.9, 0.1))
    
    // parquet输出
    df2.write
       .mode("overwrite")
       .save(path + "res/")
    // txt输出
    df2.map(x => x.mkString(","))
      .write
      .mode("overwrite")
      .format("text")
      .save(path + "res/txt")
    // 后面tensorflow需要
    print("The total dimension of all categorical features is " + i) // 14670
    

    Tensorflow

    下面代码大致介绍深度学习的整体流程,生产环境的代码需要做一定的修改,可以参考“https://github.com/yangxudong/deeplearning/tree/master/DCN” 和 “https://github.com/lambdaji/tf_repos/tree/master/deep_ctr” 两个GitHub的实现。

    大致流程:定义数据输入函数input_fn,然后开始规划模型和它的训练和测试operation,最后是执行阶段的代码。

    input function

    def input_fn(filenames, batch_size=32):
    
        def _parse_line(line):
            fields = tf.decode_csv(line,FIELD_DEFAULTS)
            label = fields[0]
            num_features = fields[1:14]
            cat_features = fields[14:]
            
            return num_features, cat_features, label
    
        num_features, cat_features, label = tf.data.TextLineDataset(filenames)
        .repeat(2)
        .prefetch(1024)
        .batch(batch_size)
    	.map(_parse_line, num_parallel_calls=2)
        .make_one_shot_iterator()
        .get_next()
        
        return num_features, cat_features, label
    

    数据和模型的变量

    # 构建一些input需要用到的参数。
    NUM_COLUMNS = ["c%d" % i for i in range(14)]
    CAT_COLUMNS = ["c%d" % i for i in range(14,40)]
    
    FIELD_DEFAULTS = []
    for i in range(14):
        FIELD_DEFAULTS.append([0.0]) 
    for i in range(14,40):
        FIELD_DEFAULTS.append([0]) 
    
    filenames = []
    for i in range(24):
    	...
        
    # 本地调试
    num_col = 13
    cat_col = 26
    cat_size = 14670
    embedding_size = 12
    cross_layers = 3
    deep_layers = [200,100,33]
    label_size = 1
    learning_rate = 0.0005
    

    DCN模型

    # DCN模型的构建,这里利用LOW Level API,实际上按照custom Estimator的流程会更好。
    with tf.name_scope("DCN_model"):
        he_init = tf.variance_scaling_initializer()
        with tf.name_scope("Embedding_layer"):
            x1, x2, label = input_fn(filenames,32)
            Embed_W = tf.get_variable(name='embed_w', shape=[cat_size, embedding_size],
                                      initializer=he_init)  # TC * E
            embeddings = tf.nn.embedding_lookup(Embed_W, x2) # ? * C * E
            oned_embed = tf.reshape(embeddings, shape=[-1, cat_col * embedding_size])  # ? * (C * E)
            embed_layer_res = tf.concat([x1, oned_embed], 1)  # ? * (N + C * E)
    
        with tf.name_scope("Cross_Network"):
            x0 = embed_layer_res 
            cross_x = embed_layer_res
            for level in range(cross_layers):
                Cross_W = tf.get_variable(name='cross_w%s' % level, shape=[num_col + cat_col * embedding_size, 1],
                                          initializer=he_init)  # (N + C * E) * 1
                Cross_B = tf.get_variable(name='cross_b%s' % level, shape=[1,num_col + cat_col * embedding_size],
                                          initializer=he_init)  # (N + C * E) * 1
                xtw = tf.matmul(cross_x, Cross_W)  # ? * 1
                cross_x = x0 * xtw + cross_x + Cross_B  # (N + C * E) * 1
    
        with tf.name_scope("Deep_Network"):
            deep_x = embed_layer_res
            for neurons in deep_layers:
                deep_x = tf.layers.dense(inputs=deep_x, units=neurons, name='deep_%s' % neurons,
                                         activation=tf.nn.selu, kernel_initializer=he_init)
    
        with tf.variable_scope("Output-layer"):
            x_stack = tf.concat([cross_x, deep_x], 1)  # ? * ((N + C * E) + deep_layers[-1])
            logits = tf.layers.dense(inputs=x_stack, units=label_size, name="outputs")
            z = tf.reshape(logits, shape=[-1])
            pred = tf.sigmoid(z)
    

    训练和评估指标

    with tf.name_scope("loss"):
        xentropy = tf.nn.sigmoid_cross_entropy_with_logits(labels=label, logits=z)
        loss = tf.reduce_mean(xentropy, name="loss")
        loss_summary = tf.summary.scalar('log_loss', loss)
        
    with tf.name_scope("train"):
        optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate, beta1=0.9, beta2=0.999, epsilon=1e-8)
        training_op = optimizer.minimize(loss)
        
    with tf.name_scope("eval"):
        acc, upacc = tf.metrics.accuracy(label, tf.math.round(pred))
        auc, upauc = tf.metrics.auc(label, pred)
        acc_summary = tf.summary.scalar('accuracy', upacc)
        auc_summary = tf.summary.scalar('auc', upauc)
    

    TensorBroad相关设置,optional

    from datetime import datetime
    
    def log_dir(prefix=""):
        now = datetime.utcnow().strftime("%Y%m%d%H%M%S")
        root_logdir = "tf_logs"
        if prefix:
            prefix += "-"
        name = prefix + "run-" + now
        return "{}/{}/".format(root_logdir, name)
    
    logdir = log_dir("my_dcn")
    file_writer = tf.summary.FileWriter(logdir, tf.get_default_graph())
    

    执行阶段

    # 包含checkpoint、early stop。同样,这里利用LOW Level API,实际上按照custom Estimator的流程会更好。
    n_epochs = 2
    data_size = 12000000
    batch_size = 64
    n_batches = int(np.ceil(data_size / batch_size))
    
    checkpoint_path = ".../model/my_dcn_model.ckpt"
    checkpoint_epoch_path = checkpoint_path + ".epoch"
    final_model_path = "./my_deep_mnist_model"
    
    best_auc = np.infty
    epochs_without_progress = 0
    max_epochs_without_progress = 20
    
    saver = tf.train.Saver()
    gb_init = tf.global_variables_initializer()
    lc_init = tf.local_variables_initializer()
    with tf.Session() as sess:
        if os.path.isfile(checkpoint_epoch_path):
            with open(checkpoint_epoch_path, "rb") as f:
                start_epoch = int(f.read())
            print("Training was interrupted. Continuing at epoch", start_epoch)
            saver.restore(sess, checkpoint_path)
        else:
            start_epoch = 0
            sess.run([gb_init,lc_init])
            
        for epoch in range(start_epoch, n_epochs):
            for batch_index in range(n_batches):
                # 每2000批数据测试一遍
                if batch_index % 2000 != 0:
                    sess.run(training_op)
                else:
                    loss_tr, loss_summary_str, up1, up2, acc_summary_str, auc_summary_str = sess.run([loss, loss_summary, upacc, upauc, acc_summary, auc_summary])
                    print("Epoch:", epoch, ",Batch_index:", batch_index,
                          "	Loss: {:.5f}".format(loss_tr),
                          "	ACC: ", up1,
                          "	AUC", up2)
                    file_writer.add_summary(acc_summary_str, batch_index)
                    file_writer.add_summary(auc_summary_str, batch_index)
                    file_writer.add_summary(loss_summary_str, batch_index)
                    if batch_index % 5000 == 0:
                        saver.save(sess, checkpoint_path)
                        with open(checkpoint_epoch_path, "wb") as f:
                            f.write(b"%d" % (epoch + 1))
                        if up2 < best_auc:
                            saver.save(sess, final_model_path)
                            best_auc = up2
                        else:
                            epochs_without_progress += 1
                            if epochs_without_progress > max_epochs_without_progress:
                                print("Early stopping")
                                break
    

    参考:

    玩转企业级Deep&Cross Network模型你只差一步

  • 相关阅读:
    【IOS】判断字符串为空和只为空格解决办法
    【IOS】UIActionSheet的使用
    【IOS】倒计时实现的两种方法
    android 给listview或gridview添加数据加载时的动画
    彻底颠覆你的思维,羊毛出在狗身上,猪来买单
    王石:活着的意义在哪?
    android 将图片通过base64转换为String 将图片String转换为Bitmap
    Android 之 下拉框(Spinner)的使用
    ViewPager的简单使用说明
    Android的onCreateOptionsMenu()创建菜单Menu详解
  • 原文地址:https://www.cnblogs.com/code2one/p/10343134.html
Copyright © 2011-2022 走看看