zoukankan      html  css  js  c++  java
  • lakala反欺诈建模实际应用代码GBDT监督学习

    /**
      * Created by lkl on 2018/1/16.
      */
    import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
    import org.apache.spark.mllib.linalg.Vectors
    import org.apache.spark.mllib.regression.LabeledPoint
    import org.apache.spark.mllib.tree.GradientBoostedTrees
    import org.apache.spark.mllib.tree.configuration.BoostingStrategy
    import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel
    import org.apache.spark.sql.{Row, SaveMode}
    import org.apache.spark.sql.hive.HiveContext
    import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
    import org.apache.spark.{SparkConf, SparkContext}
    import scala.collection.mutable.ArrayBuffer
    object abregression3Model20180116 {
      def main(args: Array[String]): Unit = {
    
        val sparkConf = new SparkConf().setAppName("abregression3Model20180116")
        val sc = new SparkContext(sparkConf)
        val hc = new HiveContext(sc)
        val data = hc.sql(s"select * from lkl_card_score.fqz_score_dataset_03train").map {
          row =>
            val arr = new ArrayBuffer[Double]()
            //剔除label、phone字段
            for (i <- 3 until row.size) {
              if (row.isNullAt(i)) {
                arr += 0.0
              }
              else if (row.get(i).isInstanceOf[Int])
                arr += row.getInt(i).toDouble
              else if (row.get(i).isInstanceOf[Double])
                arr += row.getDouble(i)
              else if (row.get(i).isInstanceOf[Long])
                arr += row.getLong(i).toDouble
              else if (row.get(i).isInstanceOf[String])
                arr += 0.0
            }
            LabeledPoint(row.getInt(2).toDouble, Vectors.dense(arr.toArray))
        }
    
        // Split data into training (60%) and test (40%)
        val Array(trainingData, testData) = data.randomSplit(Array(0.7,0.3), seed = 11L)
        // 逻辑回归是迭代算法,所以缓存训练数据的RDD
        trainingData.cache()
        //使用SGD算法运行逻辑回归
    
        val boostingStrategy = BoostingStrategy.defaultParams("Regression")
        boostingStrategy.setNumIterations(40) // Note: Use more iterations in practice.
        boostingStrategy.treeStrategy.setMaxDepth(6)
        boostingStrategy.treeStrategy.setMinInstancesPerNode(50)
        val model = GradientBoostedTrees.train(data, boostingStrategy)
        model.save(sc, s"hdfs://ns1/user/songchunlin/model/abregression3Model20180116")
    
        sc.makeRDD(Seq(model.toDebugString)).repartition(1).saveAsTextFile(s"hdfs://ns1/user/songchunlin/model/toDebugString/abregression3Model20180116")
        // 全量data数据打分,原本用testData打分
        val  omodel=GradientBoostedTreesModel.load(sc,s"hdfs://ns1/user/songchunlin/model/abregression3Model20180116")
        val predictionAndLabels = data.map { case LabeledPoint(label, features) =>
          val prediction = omodel.predict(features)
          (prediction, label)
        }
    
    
        println("testData count = " + testData.count())
        println("predictionAndLabels count = " + predictionAndLabels.count())
        predictionAndLabels.map(x => {"predicts: "+x._1+"--> labels:"+x._2}).saveAsTextFile(s"hdfs://ns1/user/szdsjkf/model_training/jrsc/20171218/predictionAndLabels")
    
        val metrics = new BinaryClassificationMetrics(predictionAndLabels)
    
        val precision = metrics.precisionByThreshold
    
        precision.map({case (t, p) =>
          "Threshold: "+t+"Precision:"+p
        }).saveAsTextFile(s"hdfs://ns1/user/szdsjkf/model_training/jrsc/20171218/precision")
    
        val recall = metrics.recallByThreshold
    
        recall.map({case (t, r) =>
          "Threshold: "+t+"Recall:"+r
        }).saveAsTextFile(s"hdfs://ns1/user/szdsjkf/model_training/jrsc/20171218/recall")
    
        val beta = 2
        val f2Score = metrics.fMeasureByThreshold(beta)
    
        f2Score.map(x => {"Threshold: "+x._1+"--> F-score:"+x._2+"--> Beta = 2"})
          .saveAsTextFile(s"hdfs://ns1/user/szdsjkf/model_training/jrsc/20171218/f1Score")
    
    
        val prc = metrics.pr
        prc.map(x => {"Recall: " + x._1 + "--> Precision: "+x._2 }).saveAsTextFile(s"hdfs://ns1/user/szdsjkf/model_training/jrsc/20171218/prc")
    
        // AUPRC,精度,召回曲线下的面积
        val auPRC = metrics.areaUnderPR
        println("Area under precision-recall curve = " +auPRC)
        sc.makeRDD(Seq("Area under precision-recall curve = " +auPRC)).saveAsTextFile(s"hdfs://ns1/user/szdsjkf/model_training/jrsc/20171218/auPRC")
    
        //roc
        val roc = metrics.roc
        roc.map(x => {"FalsePositiveRate:" + x._1 + "--> Recall: " +x._2}).saveAsTextFile(s"hdfs://ns1/user/szdsjkf/model_training/jrsc/20171218/roc")
    
        // AUC
        val auROC = metrics.areaUnderROC
        sc.makeRDD(Seq("Area under ROC = " + +auROC)).saveAsTextFile(s"hdfs://ns1/user/szdsjkf/model_training/jrsc/20171218/auROC")
        println("Area under ROC = " + auROC)
        //val accuracy = 1.0 * predictionAndLabels.filter(x => x._1 == x._2).count() / testData.count()
    
    
        val dataInstance = hc.sql(s"select * from lkl_card_score.fqz_score_dataset_03train").map {
          row =>
            val arr = new ArrayBuffer[Double]()
            //剔除label、phone字段
            for (i <- 3 until row.size) {
              if (row.isNullAt(i)) {
                arr += 0.0
              }
              else if (row.get(i).isInstanceOf[Int])
                arr += row.getInt(i).toDouble
              else if (row.get(i).isInstanceOf[Double])
                arr += row.getDouble(i)
              else if (row.get(i).isInstanceOf[Long])
                arr += row.getLong(i).toDouble
              else if (row.get(i).isInstanceOf[String])
                arr += 0.0
            }
            (row(0),row(1),row(2),Vectors.dense(arr.toArray))
        }
    
        val preditDataGBDT = dataInstance.map { point =>
          val prediction = model.predict(point._4)
          //order_id,apply_time,score
          (point._1,point._2,point._3,prediction)
        }
    
        //rdd转dataFrame
        val rowRDD = preditDataGBDT.map(row => Row(row._1.toString,row._2.toString,row._3.toString,row._4))
        val schema = StructType(
          List(
            StructField("order_id", StringType, true),
            StructField("apply_time", StringType, true),
            StructField("label", StringType, true),
            StructField("score", DoubleType, true)
          )
        )
        //将RDD映射到rowRDD,schema信息应用到rowRDD上
        val scoreDataFrame = hc.createDataFrame(rowRDD,schema)
        scoreDataFrame.count()
        scoreDataFrame.write.mode(SaveMode.Overwrite).saveAsTable("lkl_card_score.fqz_score_dataset_03train_predict")
    
    
        //   自己测试建模
    
        val balance = hc.sql(s"select * from lkl_card_score.overdue_result_all_new_woe_instant_v3_02train where label='1' limit 85152 union all  select * from lkl_card_score.overdue_result_all_new_woe_instant_v3_02train where label='0'").map {
          row =>
            val arr = new ArrayBuffer[Double]()
            //剔除label、phone字段
            for (i <- 3 until row.size) {
              if (row.isNullAt(i)) {
                arr += 0.0
              }
              else if (row.get(i).isInstanceOf[Int])
                arr += row.getInt(i).toDouble
              else if (row.get(i).isInstanceOf[Double])
                arr += row.getDouble(i)
              else if (row.get(i).isInstanceOf[Long])
                arr += row.getLong(i).toDouble
              else if (row.get(i).isInstanceOf[String])
                arr += 0.0
            }
            LabeledPoint(row.getInt(2).toDouble, Vectors.dense(arr.toArray))
        }
    
    
    
        // 逻辑回归是迭代算法,所以缓存训练数据的RDD
        balance.cache()
        val boostingStrategy1 = BoostingStrategy.defaultParams("Regression")
        boostingStrategy1.setNumIterations(40) // Note: Use more iterations in practice.
        boostingStrategy1.treeStrategy.setMaxDepth(6)
        boostingStrategy1.treeStrategy.setMinInstancesPerNode(50)
    
        val model2 = GradientBoostedTrees.train(balance, boostingStrategy1)
    
    
        val predictionAndLabels2 = data.map { case LabeledPoint(label, features) =>
          val prediction = model2.predict(features)
          (prediction, label)
        }
        val metrics2 = new BinaryClassificationMetrics(predictionAndLabels2)
        // AUPRC,精度,召回曲线下的面积
        val auPRC1 = metrics2.areaUnderPR
    
        val preditDataGBDT1 = dataInstance.map { point =>
          val prediction2 = model2.predict(point._4)
          //order_id,apply_time,score
          (point._1,point._2,point._3,prediction2)
        }
        //rdd转dataFrame
        val rowRDD2 = preditDataGBDT1.map(row => Row(row._1.toString,row._2.toString,row._3.toString,row._4))
        val schema2 = StructType(
          List(
            StructField("order_id", StringType, true),
            StructField("apply_time", StringType, true),
            StructField("label", StringType, true),
            StructField("score", DoubleType, true)
          )
        )
    
        val scoreDataFrame2 = hc.createDataFrame(rowRDD2,schema2)
        scoreDataFrame2.count()
        scoreDataFrame2.write.mode(SaveMode.Overwrite).saveAsTable("lkl_card_score.fqz_score_dataset_02val_170506_predict")
    
      }
    }
    
  • 相关阅读:
    MDX查询语句
    MyEclipse 点击 部署 按钮 无效
    C#创建数字证书并导出为pfx,并使用pfx进行非对称加解密
    SSIS – 变量和表达式
    使用 SSIS Foreach Loop 容器 – Foreach Item Enumerator
    SSIS – For Loop Container
    SSIS 中的文件系统任务 (File System Task)
    TypeError: parse() got an unexpected keyword argument 'transport_encoding' 安装tensor后报错
    np基本函数大全
    使用OpenCV对图像进行缩放
  • 原文地址:https://www.cnblogs.com/canyangfeixue/p/8296511.html
Copyright © 2011-2022 走看看