Spark中常见的三种分类模型:线性模型、决策树和朴素贝叶斯模型。
线性模型,简单而且相对容易扩展到非常大的数据集;线性模型又可以分成:1.逻辑回归;2.线性支持向量机
决策树是一个强大的非线性技术,训练过程计算量大并且较难扩展(幸运的是,MLlib会替我们考虑扩展性的问题),但是在很多情况下性能很好;
朴素贝叶斯模型简单、易训练,并且具有高效和并行的优点(实际中,模型训练只需要遍历所有数据集一次)。当采用合适的特征工程,这些模型在很多应用中都能达到不错的性能。而且,朴素贝叶斯模型可以作为一个很好的模型测试基准,用于比较其他模型的性能。
现在我们采用的数据集是stumbleupon,这个数据集是主要是一些网页的分类数据。
内容样例:String = "http://www.bloomberg.com/news/2010-12-23/ibm-predicts-holographic-calls-air-breathing-batteries-by-2015.html" "4042" "{""title"":""IBM Sees Holographic Calls Air Breathing Batteries ibm sees holographic calls, air-breathing batteries"",""body"":""A sign stands outside the International Business Machines Corp IBM Almaden Research Center campus in San Jose California Photographer Tony Avelar Bloomberg Buildings stand at the International Business Machines Corp IBM Almaden Research Center campus in the Santa Teresa Hills of San Jose California Photographer Tony Avelar Bloomberg By 2015 your mobile phone will project a 3 D image of anyone who calls and your laptop will be powered by kinetic energy At least that s what International Business Machines Corp sees in its crystal ...
开始四列分别包含 URL 、页面的 ID 、原始的文本内容和分配给页面的类别。接下来 22 列包含各种各样的数值或者类属特征。最后一列为目标值, -1 为长久, 0 为短暂。
val rawData = sc.textFile("/user/common/stumbleupon/train_noheader.tsv") val records = rawData.map(line => line.split(" ")) records.first()
由于数据格式的问题,我们做一些数据清理的工作,在处理过程中把额外的( " )去掉。数据集中还有一些用 "?" 代替的缺失数据,本例中,我们直接用 0 替换那些缺失数据 。
在清理和处理缺失数据后,我们提取最后一列的标记变量以及第 5 列到第 25 列的特征矩阵。将标签变量转换为 Int 值,特征向量转换为 Double 数组。
最后,我们将标签和和特征向量转换为 LabeledPoint 实例,从而将特征向量存储到 MLlib 的 Vector 中。
import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors val data = records.map { r => val trimmed = r.map(_.replaceAll(""", "")) val label = trimmed(r.size - 1).toInt val features = trimmed.slice(4, r.size - 1).map(d => if (d =="?") 0.0 else d.toDouble) LabeledPoint(label, Vectors.dense(features)) }
(朴素贝叶斯特殊的数据处理)在对数据集做进一步处理之前,我们发现数值数据中包含负的特征值。我们知道,朴素贝叶斯模型要求特征值非负,否则碰到负的特征值程序会抛出错误。因此,需要为朴素贝叶斯模型构建一份输入特征向量的数据,将负特征值设为 0
val nbData = records.map { r => val trimmed = r.map(_.replaceAll(""", "")) val label = trimmed(r.size - 1).toInt val features = trimmed.slice(4, r.size - 1).map(d => if (d =="?") 0.0 else d.toDouble).map(d => if (d < 0) 0.0 else d) LabeledPoint(label, Vectors.dense(features)) }
分别训练逻辑回归、SVM、朴素贝叶斯模型和决策树
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD import org.apache.spark.mllib.classification.SVMWithSGD import org.apache.spark.mllib.classification.NaiveBayes import org.apache.spark.mllib.tree.DecisionTree import org.apache.spark.mllib.tree.configuration.Algo import org.apache.spark.mllib.tree.impurity.Entropy val numIterations = 10 val maxTreeDepth = 5
训练逻辑回归模型
val lrModel = LogisticRegressionWithSGD.train(data, numIterations)
训练SVM模型
val svmModel = SVMWithSGD.train(data, numIterations)
训练朴素贝叶斯模型
val nbModel = NaiveBayes.train(nbData)
训练决策树模型
val dtModel = DecisionTree.train(data, Algo.Classification, Entropy, maxTreeDepth)
验证预测结果的正确性,以逻辑回归为例子,说明预测的结果是错误的
val dataPoint = data.first val prediction = lrModel.predict(dataPoint.features) # 输出 prediction: Double = 1.0 val trueLabel = dataPoint.label # 输出 trueLabel: Double = 0.0
评估分类模型的性能
1.逻辑回归模型
val lrTotalCorrect = data.map { point => if (lrModel.predict(point.features) == point.label) 1 else 0 }.sum val lrAccuracy = lrTotalCorrect / data.count
lrAccuracy: Double = 0.5146720757268425
2.SVM模型
val svmTotalCorrect = data.map { point => if (svmModel.predict(point.features) == point.label) 1 else 0 }.sum val svmAccuracy = svmTotalCorrect / data.count
svmAccuracy: Double = 0.5146720757268425
3.贝叶斯模型
val nbTotalCorrect = nbData.map { point => if (nbModel.predict(point.features) == point.label) 1 else 0 }.sum val nbAccuracy = nbTotalCorrect / data.count
nbAccuracy: Double = 0.5803921568627451
4.决策树模型
val dtTotalCorrect = data.map { point => val score = dtModel.predict(point.features) val predicted = if (score > 0.5) 1 else 0 if (predicted == point.label) 1 else 0 }.sum val dtAccuracy = dtTotalCorrect / data.count
dtAccuracy: Double = 0.6482758620689655
准确率和召回率
改进模型性能以及参数调优
1.特征标准化
研究特征是如何分布的,先将特征向量用 RowMatrix 类表示成 MLlib 中的分布矩阵。 RowMatrix 是一个由向量组成的 RDD ,其中每个向量是分布矩阵的一行。
RowMatrix 类中有一些方便操作矩阵的方法,其中一个方法可以计算矩阵每列的统计特性:
import org.apache.spark.mllib.linalg.distributed.RowMatrix val vectors = data.map(lp => lp.features) val matrix = new RowMatrix(vectors) val matrixSummary = matrix.computeColumnSummaryStatistics() #computeColumnSummaryStatistics 方法计算特征矩阵每列的不同统计数据,包括均值和方差,所有统计值按每列一项的方式存储在一个 Vector 中
println(matrixSummary.mean) #输出矩阵每列的均值 println(matrixSummary.min) #输出矩阵每列的最小值 println(matrixSummary.max) #输出矩阵每列的最大值 println(matrixSummary.variance) #输出矩阵每列的方差 println(matrixSummary.numNonzeros) #输出矩阵每列中非 0 项的数目
对特征矩阵进行归一化
import org.apache.spark.mllib.feature.StandardScaler val scaler = new StandardScaler(withMean = true, withStd = true).fit(vectors) # 传入两个参数,一个表示是否从数据中减去均值,另一个表示是否应用标准差缩放 val scaledData = data.map(lp => LabeledPoint(lp.label,scaler.transform(lp.features)))
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics val lrModelScaled = LogisticRegressionWithSGD.train(scaledData, numIterations) val lrTotalCorrectScaled = scaledData.map { point => if (lrModelScaled.predict(point.features) == point.label) 1 else 0 }.sum val lrAccuracyScaled = lrTotalCorrectScaled / numData val lrPredictionsVsTrue = scaledData.map { point => (lrModelScaled.predict(point.features), point.label) } val lrMetricsScaled = new BinaryClassificationMetrics(lrPredictionsVsTrue) val lrPr = lrMetricsScaled.areaUnderPR val lrRoc = lrMetricsScaled.areaUnderROC println(f"${lrModelScaled.getClass.getSimpleName} Accuracy:${lrAccuracyScaled * 100}%2.4f%% Area under PR: ${lrPr *100.0}%2.4f%% Area under ROC: ${lrRoc * 100.0}%2.4f%%")
可以看出,特征标准化提升了逻辑回归模型的准确率和AUC
LogisticRegressionModel Accuracy:62.0419% Area under PR: 72.7254% Area under ROC: 61.9663%