/** * Created by lkl on 2017/12/7. */ import breeze.numerics.abs import org.apache.spark.sql.SQLContext import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.tree.RandomForest import org.apache.spark.mllib.tree.model.RandomForestModel object proportion { def main(args: Array[String]): Unit = { val cf = new SparkConf().setAppName("ass").setMaster("local") val sc = new SparkContext(cf) val sqlContext = new SQLContext(sc) val File1 = sc.textFile("20171117PP.txt").filter(_.contains("OK")).map(_.replace(",0],","a[").split("a").last).map(_.replace("OK", "1")).map(_.replace("FAIL", "0")).map(line => (line.split(";").last.toDouble, line.split(";").head)) File1.saveAsTextFile("201712072") val File2=sc.textFile("20171117PP.txt").filter(_.contains("FAIL")).map(_.replace(",0],","a[").split("a").last).map(_.replace("OK", "1")).map(_.replace("FAIL", "0")).map(line => (line.split(";").last.toDouble, line.split(";").head)) val b=File2.randomSplit(Array(0.1, 0.9)) val (strainingDatas, stestDatas) = (b(0), b(1)) val File=File1 union(strainingDatas) val ass = File.map { p => { var str = "" val l = p._1 val a = p._2.substring(2, p._2.length - 2) val b = a.replace("],[", "a") val c = b.split("a") for (arr <- c) { val index1 = arr.split(",")(0).toInt + "," val index2 = arr.split(",")(1).toInt + "," val index3 = arr.split(",")(2).toInt + " " val index = index1 + index2 + index3 str += index } (l, str.substring(0, str.length - 1)) } } val rdd = ass.map( p => { val l=p._1 val rowall =new Array[Double](2500) val arr = p._2.split(" ") var map:Map[Int,List[Double]] = Map() var vlist:List[Double] = List() for(a <- arr){ val x = a.split(",")(0).toInt val y = a.split(",")(1).toInt+5 val t = a.split(",")(2).toInt val index = (x*10)+(y+1) val v = t vlist = v :: map.get(index).getOrElse(List()) map += (index -> vlist) } map.foreach(p => { val k = p._1 val v = p._2 val sv = v.toSet.size val rv = sv.toDouble/v.size.toDouble val tmp =f"$rv%1.2f".toDouble rowall(k) = tmp }) (l,rowall) }) val usersList = rdd.map(p => { LabeledPoint(p._1.toDouble, Vectors.dense(p._2.toArray)) }) val splits = usersList.randomSplit(Array(1, 1)) val (trainingData, testData) = (splits(0), splits(1)) // 随机森林训练参数设置 //分类数 val numClasses = 2 // categoricalFeaturesInfo 为空,意味着所有的特征为连续型变量 val categoricalFeaturesInfo = Map[Int, Int]() //树的个数 val numTrees =18 //特征子集采样策略,auto 表示算法自主选取 val featureSubsetStrategy = "auto" //纯度计算 val impurity = "gini" //树的最大层次 val maxDepth =20 //特征最大装箱数 val maxBins =30 //训练随机森林分类器,trainClassifier 返回的是 RandomForestModel 对象 val model = RandomForest.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins) val labelAndPreds = testData.map { point => val prediction = model.predict(point.features) (point.label, prediction) } // import sqlContext.implicits._ // val ss = labelAndPreds.toDF("a", "b").registerTempTable("people") // val teenagers = sqlContext.sql("SELECT count(1) FROM people WHERE a-b=0") // teenagers.collect() val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData.count() val accurate = 1 - testErr.toDouble println("Test Error = " + accurate) println("Learned classification forest model: " + model.toDebugString) // 将训练后的随机森林模型持久化 model.save(sc, "myModelPath23") //加载随机森林模型到内存 val sameModel = RandomForestModel.load(sc, "myModelPath") } }