5.2.从数据中提取合适的特征
[root@demo1 ch05]# sed 1d train.tsv > train_noheader.tsv
[root@demo1 ch05]# ll
total 42920
-rw-r--r-- 1 root root 21972457 Jan 31 15:03 train_noheader.tsv
-rw-r--r-- 1 root root 21972916 Jan 31 15:00 train.tsv
[root@demo1 ch05]# hdfs dfs -mkdir /user/root/studio/MachineLearningWithSpark/ch05
[root@demo1 ch05]# hdfs dfs -put train_noheader.tsv /user/root/studio/MachineLearningWithSpark/ch05
[root@demo1 ch05]# spark-shell --master yarn
scala> val rawData = sc.textFile("/user/root/studio/MachineLearningWithSpark/ch05/train_noheader.tsv")
rawData: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:27
scala> val records = rawData.map(line => line.split(" "))
records: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at <console>:29
scala> records.first()
res1: Array[String] = Array("http://www.bloomberg.com/news/2010-12-23/ibm-predicts-holographic-calls-air-breathing-batteries-by-2015.html", "4042", "{""title"":""IBM Sees Holographic Calls Air Breathing Batteries ibm sees holographic calls, air-breathing batteries"",""body"":""A sign stands outside the International Business Machines Corp IBM Almaden Research Center campus in San Jose California Photographer Tony Avelar Bloomberg Buildings stand at the International Business Machines Corp IBM Almaden Research Center campus in the Santa Teresa Hills of San Jose California Photographer Tony Avelar Bloomberg By 2015 your mobile phone will project a 3 D image of anyone who calls and your laptop will be powered by kinetic energy At least that s what International Business Machines Corp sees ...
scala> import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.regression.LabeledPoint
scala> import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Vectors
scala> val data = records.map{ r =>
| val trimmed = r.map(_.replaceAll(""",""))
| val label = trimmed(r.size - 1).toInt
| val features = trimmed.slice(4, r.size - 1).map(d => if (d == "?") 0.0 else d.toDouble)
| LabeledPoint(label, Vectors.dense(features))
| }
data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[3] at map at <console>:33
5.3.训练分类模型
scala> import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
scala> import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.classification.SVMWithSGD
scala> import org.apache.spark.mllib.classification.NaiveBayes
import org.apache.spark.mllib.classification.NaiveBayes
scala> import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.DecisionTree
scala> import org.apache.spark.mllib.tree.configuration.Algo
import org.apache.spark.mllib.tree.configuration.Algo
scala> import org.apache.spark.mllib.tree.impurity.Entropy
import org.apache.spark.mllib.tree.impurity.Entropy
scala> val numIterations = 10
numIterations: Int = 10
scala> val maxTreeDepth = 5
maxTreeDepth: Int = 5
scala> val lrModel = LogisticRegressionWithSGD.train(data, numIterations)
lrModel: org.apache.spark.mllib.classification.LogisticRegressionModel = org.apache.spark.mllib.classification.LogisticRegressionModel: intercept = 0.0, numFeatures = 22, numClasses = 2, threshold = 0.5
scala> val svmModel = SVMWithSGD.train(data, numIterations)
svmModel: org.apache.spark.mllib.classification.SVMModel = org.apache.spark.mllib.classification.SVMModel: intercept = 0.0, numFeatures = 22, numClasses = 2, threshold = 0.0
scala> val nbModel = NaiveBayes.train(nbData)
nbModel: org.apache.spark.mllib.classification.NaiveBayesModel = org.apache.spark.mllib.classification.NaiveBayesModel@42cf75c1
scala> val dtModel = DecisionTree.train(data, Algo.Classification, Entropy, maxTreeDepth)
dtModel: org.apache.spark.mllib.tree.model.DecisionTreeModel = DecisionTreeModel classifier of depth 5 with 61 nodes
5.4使用分类模型
scala> val dataPoint = data.first
dataPoint: org.apache.spark.mllib.regression.LabeledPoint = (0.0,[0.789131,2.055555556,0.676470588,0.205882353,0.047058824,0.023529412,0.443783175,0.0,0.0,0.09077381,0.0,0.245831182,0.003883495,1.0,1.0,24.0,0.0,5424.0,170.0,8.0,0.152941176,0.079129575])
scala> val prediction = lrModel.predict(dataPoint.features)
prediction: Double = 1.0
scala> val trueLabel = dataPoint.label
trueLabel: Double = 0.0
scala> val predictions = lrModel.predict(data.map(lp => lp.features))
predictions: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[99] at mapPartitions at GeneralizedLinearAlgorithm.scala:69
scala> predictions.take(5)
res3: Array[Double] = Array(1.0, 1.0, 1.0, 1.0, 1.0)
5.5.评估分类模型的性能
scala> val lrTotalCorrect = data.map { point =>
| if (lrModel.predict(point.features) == point.label) 1 else 0
| }.sum
lrTotalCorrect: Double = 3806.0
scala> val lrAccuracy = lrTotalCorrect / data.count
lrAccuracy: Double = 0.5146720757268425
scala> val svmTotalCorrect = data.map { point =>
| if (svmModel.predict(point.features) == point.label) 1 else 0
| }.sum
svmTotalCorrect: Double = 3806.0
scala> val svmAccuracy = svmTotalCorrect / data.count
svmAccuracy: Double = 0.5146720757268425
scala> val nbTotalCorrect = nbData.map { point =>
| if (nbModel.predict(point.features) == point.label) 1 else 0
| }.sum
nbTotalCorrect: Double = 4292.0
scala> val nbAccuracy = nbTotalCorrect / data.count
nbAccuracy: Double = 0.5803921568627451
scala> val dtTotalCorrect = data.map { point =>
| val score = dtModel.predict(point.features)
| val predicted = if (score > 0.5) 1 else 0
| if (predicted == point.label) 1 else 0
| }.sum
dtTotalCorrect: Double = 4794.0
scala> val dtAccuracy = dtTotalCorrect / data.count
dtAccuracy: Double = 0.6482758620689655
scala> import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
scala> val metrics = Seq(lrModel, svmModel).map { model =>
| val scoreAndLabels = data.map { point => (model.predict(point.features), point.label) }
| val metrics = new BinaryClassificationMetrics(scoreAndLabels)
| (model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC)
| }
metrics: Seq[(String, Double, Double)] = List((LogisticRegressionModel,0.7567586293858841,0.5014181143280931), (SVMModel,0.7567586293858841,0.5014181143280931))
scala> val nbMetrics = Seq(nbModel).map { model =>
| val scoreAndLabels = nbData.map { point =>
| val score = model.predict(point.features)
| (if (score > 0.5) 1.0 else 0.0, point.label)
| }
| val metrics = new BinaryClassificationMetrics(scoreAndLabels)
| (model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC)
| }
nbMetrics: Seq[(String, Double, Double)] = List((NaiveBayesModel,0.6808510815151734,0.5835585110136261))
scala> val dtMetrics = Seq(dtModel).map { model =>
| val scoreAndLabels = data.map { point =>
| val score = model.predict(point.features)
| (if (score > 0.5) 1.0 else 0.0, point.label)
| }
| val metrics = new BinaryClassificationMetrics(scoreAndLabels)
| (model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC)
| }
dtMetrics: Seq[(String, Double, Double)] = List((DecisionTreeModel,0.7430805993331199,0.6488371887050935))
scala> val allMetrics = metrics ++ nbMetrics ++ dtMetrics
allMetrics: Seq[(String, Double, Double)] = List((LogisticRegressionModel,0.7567586293858841,0.5014181143280931), (SVMModel,0.7567586293858841,0.5014181143280931), (NaiveBayesModel,0.6808510815151734,0.5835585110136261), (DecisionTreeModel,0.7430805993331199,0.6488371887050935))
scala> allMetrics.foreach { case (m, pr, roc) =>
| println(f"$m, Area under PR: ${pr * 100.0}%2.4f%%, Area under ROC: ${roc * 100.0}%2.4f%%")
| }
LogisticRegressionModel, Area under PR: 75.6759%, Area under ROC: 50.1418%
SVMModel, Area under PR: 75.6759%, Area under ROC: 50.1418%
NaiveBayesModel, Area under PR: 68.0851%, Area under ROC: 58.3559%
DecisionTreeModel, Area under PR: 74.3081%, Area under ROC: 64.8837%