一. 简介
1. 机器学习中,可以将数据划分为连续数据和离散数据
a. 连续数据:可以取任何值,如房价
b. 离散数据:仅有少量特殊值,如一个房屋有2个或3个房间,但不能为2.75个房间
二. 创建向量
1. 向量中的各个维度称为特征
2. Spark中既有局部向量、矩阵,也有分布式矩阵。分布式矩阵由1个多个RDD支持。局部向量有数值型索引和双精度浮点值,且存储在单一机器上。
3. MLlib中有2种类型局部向量:dense和sparse。
a. 稠密型向量(dense)表示为一个数组,如[4500, 41000, 4]
b. 稀疏型向量(sparse)则由两个平行数组构成,其中一个表示索引,另一个表示元素值,如(3, [0, 1, 2], [4500.0, 41000.0, 4.0])
c. 将向量创建为dense或sparse,依赖于数据中有多少null或0。当10000个数据中有9000个数据均为0,若使用稠密型向量,则会浪费空间。
d. 稀疏型向量较常见,Spark原生支持libsvm格式,每一行存储一个特征。
4. 示例
a. 显示导入MLlib向量:
三. 计算相关性
1. 相关性表示两个变量的统计上的关系,如当一个变量发生变化,另一个变量也会变化。相关性分为如下两种:
a. 正相关:一个变量的增加导致另一个变量的增加
b. 负相关:一个变量的增加导致另一个变量的减少
2. Spark支持两种相关性算法:Pearson和Spearman
a. Pearson作用于两个连续值,如一个人的身高和体重,或一个房子的面积和价格
b. Spearman处理一个连续值和一个分类值,如zip码和房间价格
四. 理解特征工程
1. 数据清洗(数据准备)和特征提取在数据处理中耗费的时间最多。
2. 特征选取考虑两个主要方面:特征质量和特征数量
3. 模型不关心标称特征,但对比例或数字特征非常敏感。在很多真实建模场景中,可能存在成百上千的规模差异。特征规模伸缩有如下方法:
a. 将特征值除以最大值,使得每个特征都在特定范围内
b. 将特征除以一个区间,即最大值-最小值
c. 通过平均值减去特征值,然后除以区间
d. 通过平均值减去特征值,然后除以标注偏差
4. TF-IDF用于反映一个词在文档集中的重要程度。其计算一个词的IT-IDF的伪计算公式可表示为:
TF-IDF = TF*IDF = 在某一文档中词条出现次数/该文档中所有的词条数目 * log(语料库的文档总数 / 包含该词条的文档数 + 1)
示例:com.ws.spark.study.ml.MLTest "run tf-idf"
五. 理解Spark ML
1. 机器的学习过程主要有:
a. 为机器学习算法提供训练数据集以及超参数
b. 训练的结果是一个模型
c. 然后用模型预测测试数据
2. Spark ML中,一个预估器以DataFrame方式提供(通过fit方法),训练后输出是一个Transformer。Transformer以DateFrame作为输入,并且
输出一个转换后的DataFrame。Transformer不只限于预测模型,也可用于特征转换。
3. 机器学习pipeline定义为阶段(stage)序列,每个阶段可以是预估器(estimator)或转换器(Transformaer)
示例:com.ws.spark.study.ml.MLTest "test LogisticRegression"
六. 理解超参数调整
1. 每一个机器学习算法在开始训练前,均需要设置一些超参数,如步长、学习率、回归参数等。这些超参数一般手工设置。
示例:com.ws.spark.study.ml.MLTest "test hyperparameter tuning"
七. MLLib监督学习-回归
1. 监督学习分为两类:回归和分类
a. 回归:预测连续值的输出,如房价
b. 分类:预测离散值(二值或多值)的输出(label),例如是否为垃圾邮件,或邮件标记为重要,紧急,不重要等
2. 回归的流程一般包含:
1) 获取标记数据
a. 标记数据如何获取取决于使用场景
b. 标记数据量需要大于特征数,如果过小的话,导致过拟合。
2) 将标记数据划分两部分
a. 将数据按特定比例随机划分,且每次划分时均需随机,以避免偏见
b. 划分的第一部分为训练集,第二部分为测试集。有时也会分为三部分:训练集、交叉验证、测试集,此时测试集只用于测量准确度
3) 使用算法训练数据集。训练后结果称为模型。模型的训练和创建也涉及到超参数(简单理解为配置参数)调整。
4) 使用训练好的模型对测试集预测
3. 仅有一个特征进行预测时,称为二变量分析;当有多个特征时,称为多变量分析。实际上,我们可以有任意数量的特征,例如SVM允许无穷的特征量。
示例:线性回归 com.ws.spark.study.ml.MLTest "test linear regression"
4.. 理解损失函数
1) 线性回归中的损失函数:均方误差 -> J(θ0,θ1) = sum((hi(x) - yi)^2) / 2m,其中θ0表示y轴截距,θ1表示斜率,hi(x)表示第i个元素x对应的预测值
yi表示第i个元素对应的真实值
2) 求取线性损失函数获取最优值,采用梯度下降方法。在Spark中的实现是随机梯度下降(stochastic gradient descent)。梯度下降是众多爬山演算法中的一部分,
另一个算法在Spark中引入的是限制内存尺度法(limited-memory BFGS)。这些算法用于寻找一个函数梯度为0的优化点。
5. Lasso线性回归
1) Lasso是线性回归收缩和选择方法,通常使用系数绝对值之和的上限来最小化平方误差和。之前使用的是普通最小二乘(ordinary least
squares),OLS存在两大挑战:
a. 预测精度:使用OLS的预测通常由小的预测偏差和大的方差。可通过收缩系数(甚至将其设置为0)提高预测精度,代价就是增加偏差
b. 解释性:由于有大量的预测因子可供使用,希望找到具有最强效果的子集(相关性)
2) 偏差是预测结果与实际值的距离的估计值,方差是不同预测值之间预测值差异的估计值。一般的增加特征维度,可以减少偏差。
减少特征维度或增加数据集可以减少方差
3) Lasso的主要特性在于:任何认定为无用特征,Lasso均将其系数设置为0,来将其移出等式。
示例:com.ws.spark.study.ml.MLTest "test lasso regression"
6. ridge线性回归
1) 与Lasso将特征系数设置为0,而ridge,特征将会被处罚,但不会设置为0
示例:com.ws.spark.study.ml.MLTest "test ridge regression"
八. MLLib监督学习-分类
1. 线性回归在回归任务中表现较好,但在分类任务中存在的限制如下:
a. 拟合过程很容易受到异常值的影响
b. 无法保证假设函数满足0-1范围
2. 逻辑回归保障假设函数位于0-1,即 0 <= h(x) <= 1。逻辑回归表示为h(x) = g(θx),其中g为sigmoid函数,定义为g(t) = 1 / (1 + e^-t)
3. Spark MLLib中有两个类支持逻辑回归:LogisticRegressionWithSGD和LogisticRegressionWithLBFGS
示例:com.ws.spark.study.ml.MLTest "test logistic regression"
4. SVM
示例:com.ws.spark.study.ml.MLTest "test svm algorithm"
5. 决策树
1) 决策时的优点:
a. 容易理解和解释
b. 分类和连续特征均起作用
c. 缺失特征也起作用
d. 不需要特征伸缩
2) Spark使用三种方法来决定异质:基尼异质(Gini impurity,用于分类),熵(用于分类),方差(用于回归)
示例:
a. com.ws.spark.study.ml.MLTest "test decision tree"
b. com.ws.spark.study.ml.MLTest "test decision tree2
6. 随机森林
1) 有时,一个决策树并不够,需要一组决策树来提供更强大的模型,这被称为集成学习算法。典型地有随机森林。
2) 随机森林提供K个树,为每个树提供训练数据的随机子集S。每个树仅使用特征的子集。当需要进行预测时,会在树之间进行多数投票,
并将投票结果作为预测结果。
3) 随机森林的作用方式体现在两方面的随机选取:数据的子集和拆分数据的特征子集
示例:com.ws.spark.study.ml.MLTest "test random forest"
7. 梯度提升树GBT
1) 另一个集成学习算法是梯度提升树(GBT)。GBT每次训练一个棵树,其中每一颗新树改进了先前训练的树的缺点。
2) 由于GBT每次只训练一棵树,因此相比随机森林算法,其耗时更长。
3) 注意:目前Spark MLLib 2.2.4中的GBT仅支持二值分类,不支持多值分类
示例:com.ws.spark.study.ml.MLTest "test GBT"
8. 朴素贝叶斯
1) 朴素贝叶斯假设特征之间是相互独立的
示例:com.ws.spark.study.ml.MLTest "test native bayes"
九. 无监督学习
1. 与有监督学习不同的是,无监督学习没有对应的标注数据,需要算法基于自身找到一个结构。
2. 无监督学习最常见的就是聚类。聚类是将数据分为多个组,每个组中的数据彼此相似。
3. K-means算法是常用的聚类算法
a. 首先随机选择两个点作为聚类质心
b. 群集分配:将遍历每个数据点,并将其分配到更接近的质心,并作为所呈现的聚类中一部分
c. 移动质心:会将质心移动到聚类中的数据点的均值位置
4. PCA降维
1) 降维即降低特征的维度,可用于数据压缩和可视化。降维可以提高算法效率,减少磁盘内存占用,也可以将高度相关维度减少到1个。
2) 当将1000个特征投影为100个特征时,这100个特征可能无法给出真实的含义。
3) 使用PCA前,可能需要进行特征归一化,即将不同量级的特征归一化到同等量级。
4) 常见有4种简单方法进行特征归一化
a. 将特征值除以最大值,将每个特征置于-1≤x≤1范围内
b. 将特征值除以范围,即最大值 - 最小值
c. 通过平均值减去特征值,然后将其除以范围
d. 通过平均值减去特征值,然后将其除以标准偏差
5. 奇异值分解降维
1) 基本思想是采用高维度,高度可变的数据点集合,并将其缩小到低维空间,从而更清楚地暴露原始数据的结构,并从最小量的变化中对其进行排序。
2) 奇异值分解常用语NLP中。SVD基于线性代数的鼎力,矩形矩阵A可以分解为三个矩阵的乘积:正交矩阵U,对角矩阵S和正交矩阵V
参考代码:

package com.ws.spark.study.ml import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} import org.scalatest.{BeforeAndAfterAll, FlatSpec} class MLTest extends FlatSpec with BeforeAndAfterAll { private var sc: SparkContext = null private var session: SparkSession = null private val BASE_DATA_DIR = "E:/IntelliJWorkSpace/sparkstudy/data" override protected def beforeAll(): Unit = { sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("test")) session = SparkSession.builder().appName("testCorrelation").master("local[1]").getOrCreate() } override protected def afterAll(): Unit = { sc.stop() session.stop() } def testSpark(): Unit ={ println(sc.version) sc.parallelize(List(1,2,3,4)).foreach(f => { val v = f + 10 println(v) }) sc.stop() } /** * 计算向量 */ def testVector(): Unit ={ // 显示导入MLlib向量 import org.apache.spark.ml.linalg.Vectors // 创建稠密型向量 val denseHouse = Vectors.dense(4500d, 41000d, 4d) // 创建稀疏型向量 val sparseHouse = Vectors.sparse(3, Array(0, 1, 2), Array(4500d, 41000d, 4d)) // 创建为值0的向量 val zeros = Vectors.zeros(3) } /** * 测试特征的相关性 */ "The correlation of price and size " should "> 0.85" ignore { val houses = session.createDataFrame(Seq( (1620000d,2100), (1690000d,2300), (1400000d,2046), (2000000d,4314), (1060000d,1244), (3830000d,4608), (1230000d,2173), (2400000d,2750), (3380000d,4010), (1480000d,1959) )).toDF("price", "size") // 计算corr时,默认使用pearson算法, corr提供了指定算法的参数 // 0.85表示2个特征有很强的正相关 assert(houses.stat.corr("price", "size") > 0.85) assert(houses.stat.corr("price", "size", "pearson") > 0.85) } /** * 测试TF-IDF */ "run tf-idf" should "success" ignore { import org.apache.spark.ml.feature._ // 数据加载 // val sentenceData = session.read.option("delimiter", " ").csv("file://test.csv").toDF("sentence") val sentenceData = session.read.json(s"file:///$BASE_DATA_DIR/video.json").toDF("name") // 创建转换器对句子分词 val tokenizer = new Tokenizer().setInputCol("name").setOutputCol("words") // 对句子分词 val wordsData = tokenizer.transform(sentenceData) // 创建哈希转换器, setNumFeatures表示设置hash的维数 val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20) // 将句子中的单词映射为词频 val featurizedData = hashingTF.transform(wordsData) // 创建IDF评估器 val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features") // 训练IDF模型 val idfModel = idf.fit(featurizedData) // 基于IDF模型重新调整TF模型 val rescaleData = idfModel.transform(featurizedData) rescaleData.select("rawFeatures").limit(10).show() } /** * 测试预估器和转换器 */ "test LogisticRegression" should "success" ignore { import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.linalg.{Vectors, Vector} // 创建球员信息 val lebron = (1.0, Vectors.dense(80.0, 250.0)) val tim = (0.0, Vectors.dense(70.0, 150.0)) val brittany = (1.0, Vectors.dense(80.0, 207.0)) val stacey = (0.0, Vectors.dense(65.0, 120.0)) // 创建训练DataFrame val tranning = session.createDataFrame(Seq(lebron, tim, brittany, stacey)) .toDF("label", "features") // 创建逻辑回归预估器 val estimator = new LogisticRegression() // 通过匹配预估器与训练DataFrame来创建一个转换器 val transformer = estimator.fit(tranning) // 创建测试数据 val john = Vectors.dense(90.0, 270.0) val tom = Vectors.dense(62.0, 120.0) val test = session.createDataFrame(Seq((1.0, john), (0.0, tom))).toDF("label", "features") // 使用转换器预测 val results = transformer.transform(test) // 打印结果DataFrame的scehma,可以看到,处理prediction,转换器增加了rawPrediction和概率两个列 results.printSchema() results.show() // 只显示features和prediction val predictions = results.select("features", "prediction") predictions.show() } /*** * 测试超参数调整 */ "test hyperparameter tuning" should "success" ignore { import org.apache.spark.ml.regression.LinearRegression import org.apache.spark.ml.evaluation.RegressionEvaluator import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit} val data = session.read.format("libsvm").load(s"file:///$BASE_DATA_DIR/iris_libsvm.txt") // 将数据拆分为训练数据和预测数据 val Array(training, test) = data.randomSplit(Array(0.7, 0.3), seed = 10000) // 初始化线性回归 val lr = new LinearRegression().setMaxIter(10) // 使用ParamGridBuilder创建参数网格来搜索。TrainValidationSplit将尝试所有的数值组合 // 以决定使用evaluator最好的模型 val paramGrid = new ParamGridBuilder() .addGrid(lr.regParam, Array(0.1, 0.01)) .addGrid(lr.fitIntercept) .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0)) .build() // 创建训练验证拆分 // TrainValidationSplit需要一个Estimator, 一组Estimator ParamMap和一个Evaluator val trainValidationSplit = new TrainValidationSplit() .setEstimator(lr) .setEvaluator(new RegressionEvaluator()) .setEstimatorParamMaps(paramGrid) .setTrainRatio(0.8) // 80%的数据用于训练,剩余的20%用于预测 // 训练模型,选择最好的一组参数 val model = trainValidationSplit.fit(training) // 基于测试集预测 val predictions = model.transform(test) predictions.select("features", "label", "prediction").show() // 评估预测结果 val evaluator = new RegressionEvaluator() println(evaluator.evaluate(predictions)) } /** * 线性回归 */ "test linear regression" should "success" ignore { import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.regression.LinearRegression // 房价作为label val points = session.createDataFrame(Seq( (1620000, Vectors.dense(2100)), (1690000, Vectors.dense(2300)), (1400000, Vectors.dense(2046)), (2000000, Vectors.dense(4314)), (1060000, Vectors.dense(1244)), (3830000, Vectors.dense(4608)), (1230000, Vectors.dense(2173)), (2400000, Vectors.dense(2750)), (3380000, Vectors.dense(4010)), (1480000, Vectors.dense(1959)) )).toDF("label", "features") // 初始化线性回归 val lr = new LinearRegression() // 使用数据集训练 val model = lr.fit(points) // 创建测试数据 val test = session.createDataFrame(Seq(Vectors.dense(2100.0)).map(Tuple1.apply)).toDF("features") // 预测 val predictions = model.transform(test) predictions.show() } "test linear regression2" should "success" ignore { import org.apache.spark.ml.regression.LinearRegression import org.apache.spark.ml.evaluation.RegressionEvaluator val data = session.read.format("libsvm").load("s3a://sparkcookbook/housingdata/realestate.libsvm") val Array(trainning, test) = data.randomSplit(Array(0.7, 0.3)) val lr = new LinearRegression() val model = lr.fit(trainning) val predictions = model.transform(test) val evaluator = new RegressionEvaluator() evaluator.evaluate(predictions) } /** * 测试线性回归中的Lasso算法 */ "test lasso linear regression" should "success" ignore{ import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.regression.LinearRegression val points = session.createDataFrame(Seq( (1d, Vectors.dense(5.0,3,1,2,1,3,2,2,1)), (2d, Vectors.dense(9.0,8,8,9,7,9,8,7,9)) )).toDF("label", "features") // 将setElasticNetParam设置1表示Lasso或L1归一化 val lr = new LinearRegression().setMaxIter(10).setFitIntercept(false).setRegParam(.3).setElasticNetParam(1) val model = lr.fit(points) // [0.18531922804008738,0.013616539127918346,0.0,0.0,0.0,0.009917465418232699,0.0,0.0,0.0] println(model.coefficients) } /** * 测试线性回归中的Ridge算法 */ "test ridge linear regression" should "success" ignore{ import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.regression.LinearRegression val points = session.createDataFrame(Seq( (1d, Vectors.dense(5.0,3,1,2,1,3,2,2,1)), (2d, Vectors.dense(9.0,8,8,9,7,9,8,7,9)) )).toDF("label", "features") // 将setElasticNetParam设置1表示Lasso或L1归一化 val lr = new LinearRegression().setMaxIter(10).setFitIntercept(false).setRegParam(.3).setElasticNetParam(0) val model = lr.fit(points) // [0.11329331633450115,0.03937073300046667,0.002369276442275244,0.010416987598811298,0.0043289885742031475,0.026236646722551396,0.015282817648377314,0.023597219133656366,0.0011928984792447094] println(model.coefficients) } /*** * 测试逻辑回归 */ "test logistic regression" should "success" ignore { import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.classification.{LogisticRegression, BinaryLogisticRegressionSummary} val trainingDataSet = session.createDataFrame(Seq( (0.0,Vectors.dense(0.245)), (0.0,Vectors.dense(0.247)), (1.0,Vectors.dense(0.285)), (1.0,Vectors.dense(0.299)), (1.0,Vectors.dense(0.327)), (1.0,Vectors.dense(0.347)), (0.0,Vectors.dense(0.356)), (1.0,Vectors.dense(0.36)), (0.0,Vectors.dense(0.363)), (1.0,Vectors.dense(0.364)), (0.0,Vectors.dense(0.398)), (1.0,Vectors.dense(0.4)), (0.0,Vectors.dense(0.409)), (1.0,Vectors.dense(0.421)), (0.0,Vectors.dense(0.432)), (1.0,Vectors.dense(0.473)), (1.0,Vectors.dense(0.509)), (1.0,Vectors.dense(0.529)), (0.0,Vectors.dense(0.561)), (0.0,Vectors.dense(0.569)), (1.0,Vectors.dense(0.594)), (1.0,Vectors.dense(0.638)), (1.0,Vectors.dense(0.656)), (1.0,Vectors.dense(0.816)), (1.0,Vectors.dense(0.853)), (1.0,Vectors.dense(0.938)), (1.0,Vectors.dense(1.036)), (1.0,Vectors.dense(1.045))) ).toDF("label", "features") val lr = new LogisticRegression() val model = lr.fit(trainingDataSet) // 创建训练总结 val traningSummary = model.summary // 强制转换为二值逻辑回归总结 val binarySummary = traningSummary.asInstanceOf[BinaryLogisticRegressionSummary] // 打印ROC(Receiver Operating Characteristic)下的区域,ROC是一个用于访问预测精度的统计工具 println(binarySummary.areaUnderROC) } /** * SVM算法 */ "test svm algorithm" should "success" ignore { import org.apache.spark.mllib.util.MLUtils import org.apache.spark.mllib.classification.SVMWithSGD val data = MLUtils.loadLibSVMFile(sc, s"file:///$BASE_DATA_DIR/diabetes.libsvm") // 统计数据量 assert(768 === data.count()) // 将数据拆分为训练和测试 val trainingAndTest = data.randomSplit(Array(0.5, 0.5)) val trainingData = trainingAndTest(0) val testData = trainingAndTest(1) // 训练算法,构建模型的迭代次数为100 // 可以设置不同的迭代次数,但在一个确定的拐点时,你将看到结果开始收敛 val model = SVMWithSGD.train(trainingData, 100) // 结果第一条测试数据 val label = model.predict(testData.first().features) println(s"${testData.first().features} $label") // 预测整个测试集,输出(测试label, 真实label) val predictsAndLabels = testData.map(f => (model.predict(f.features), f.label)) println(predictsAndLabels.filter(p => p._1 != p._2).count()) } /** * 决策树 */ "test decision tree" should "success" ignore { import org.apache.spark.mllib.tree.DecisionTree import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.tree.configuration.Algo._ import org.apache.spark.mllib.tree.impurity.Entropy // 获取训练数据,并封装在LabeledPoint val data = sc.parallelize(Seq( (0.0,1.0,1.0,2.0), (0.0,1.0,1.0,1.0), (0.0,1.0,1.0,0.0), (0.0,0.0,1.0,2.0), (0.0,0.0,1.0,0.0), (1.0,0.0,0.0,2.0), (1.0,0.0,0.0,1.0), (0.0,0.0,0.0,0.0) )).map(f => LabeledPoint(f._1, Vectors.dense(Array(f._2, f._3, f._4)))) // 训练模型 val model = DecisionTree.train(data, Classification, Entropy, 3) // 创建测试向量 val v = Vectors.dense(0.0, 1.0, 0.0) assert(0.0 === model.predict(v)) } // 使用ml下的决策树算法 "test decision tree2" should "success" ignore { import org.apache.spark.ml.classification.DecisionTreeClassifier import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator val data = session.read.format("libsvm").option("inferschema", "true") .load(s"file:///$BASE_DATA_DIR/diabetes.libsvm") val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3)) val dt = new DecisionTreeClassifier().setImpurity("entropy") val model = dt.fit(trainingData) val predictions = model.transform(testData) val evaluator = new BinaryClassificationEvaluator() val auroc = evaluator.evaluate(predictions) println(s"Area under ROC = $auroc") } /** * 测试随机森林 */ "test random forest" should "success" ignore { import org.apache.spark.ml.classification.RandomForestClassifier import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator val data = session.read.format("libsvm").load(s"$BASE_DATA_DIR/rf.libsvm") val Array(training, test) = data.randomSplit(Array(0.7, 0.3)) val rf = new RandomForestClassifier().setNumTrees(3) val model = rf.fit(training) val predictions = model.transform(test) val evaluator = new MulticlassClassificationEvaluator().setMetricName("accuracy") val accuracy = evaluator.evaluate(predictions) println(accuracy) println(model.toDebugString) } /** * 测试梯度提升树 */ "test GBT" should "success" ignore { import org.apache.spark.ml.classification.GBTClassifier import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator val data = session.read.format("libsvm").load(s"file:///$BASE_DATA_DIR/diabetes.libsvm") val Array(training, test) = data.randomSplit(Array(0.7, 0.3)) val gbt = new GBTClassifier().setMaxIter(10) val model = gbt.fit(training) val predictions = model.transform(test) val evaluator = new MulticlassClassificationEvaluator().setMetricName("accuracy") val accuracy = evaluator.evaluate(predictions) println(accuracy) } /** * 朴素贝叶斯 */ "test native bayes" should "success" in { import org.apache.spark.ml.classification.NaiveBayes import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator val data = session.read.format("libsvm").load(s"file:///$BASE_DATA_DIR/diabetes.libsvm") val Array(training, test) = data.randomSplit(Array(0.7, 0.3)) val model = new NaiveBayes().fit(training) val predictions = model.transform(test) val evaluator = new MulticlassClassificationEvaluator() println(evaluator.evaluate(predictions)) } /** * 测试K-means */ "test k-means" should "success" in { import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.clustering.KMeans val data = session.createDataFrame( Seq( Vectors.dense(12839,2405), Vectors.dense(10000,2200), Vectors.dense(8040,1400), Vectors.dense(13104,1800), Vectors.dense(10000,2351), Vectors.dense(3049,795), Vectors.dense(38768,2725), Vectors.dense(16250,2150), Vectors.dense(43026,2724), Vectors.dense(44431,2675), Vectors.dense(40000,2930), Vectors.dense(1260,870), Vectors.dense(15000,2210), Vectors.dense(10032,1145), Vectors.dense(12420,2419), Vectors.dense(69696,2750), Vectors.dense(12600,2035), Vectors.dense(10240,1150), Vectors.dense(876,665), Vectors.dense(8125,1430), Vectors.dense(11792,1920), Vectors.dense(1512,1230), Vectors.dense(1276,975), Vectors.dense(67518,2400), Vectors.dense(9810,1725), Vectors.dense(6324,2300), Vectors.dense(12510,1700), Vectors.dense(15616,1915), Vectors.dense(15476,2278), Vectors.dense(13390,2497.5), Vectors.dense(1158,725), Vectors.dense(2000,870), Vectors.dense(2614,730), Vectors.dense(13433,2050), Vectors.dense(12500,3330), Vectors.dense(15750,1120), Vectors.dense(13996,4100), Vectors.dense(10450,1655), Vectors.dense(7500,1550), Vectors.dense(12125,2100), Vectors.dense(14500,2100), Vectors.dense(10000,1175), Vectors.dense(10019,2047.5), Vectors.dense(48787,3998), Vectors.dense(53579,2688), Vectors.dense(10788,2251), Vectors.dense(11865,1906) ).map(Tuple1.apply)).toDF("features") // 创建k-means预估器 val kmeans = new KMeans().setK(4).setSeed(1L) // 模型训练 val model = kmeans.fit(data) val test = session.createDataFrame( Seq( Vectors.dense(876, 665), Vectors.dense(15750,1120), Vectors.dense(38768,2725), Vectors.dense(69696,2750) ).map(Tuple1.apply)).toDF("features") val prediction = model.transform(test) println(prediction.show()) } /** * 1. 使用PCA将 house size 和 lot size 合并为一个特征 房屋的z密度 * 2. 通过平均值减去特征值,然后将其除以标准偏差,以进行特征归一化 * 3. 使用线性回归来观察z密度对房价的影响程度 */ "test pca" should "success" ignore { import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.distributed._ val data = sc.textFile("s3a://sparkcookbook/saratoga/scaledhousedata.csv") val parsedData = data.map(line => Vectors.dense(line.split(",").map(_.toDouble))) val mat:RowMatrix = new RowMatrix(parsedData) // 计算一个主成分 val pc = mat.computePrincipalComponents(1) val projected = mat.multiply(pc) val projectedRDD = projected.rows projectedRDD.saveAsTextFile("phdata") } /** * 奇异值分解 */ "test svd" should "success" in { import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.distributed.RowMatrix val data = sc.parallelize(Seq( Vectors.dense(1,2), Vectors.dense(2,3), Vectors.dense(1,4), Vectors.dense(1,0), Vectors.dense(1,0), Vectors.dense(1,3), Vectors.dense(1,2), Vectors.dense(1,0), Vectors.dense(1,2), Vectors.dense(0,3), Vectors.dense(0,1), Vectors.dense(0,2) )) val mat = new RowMatrix(data) val svd = mat.computeSVD(2, true) val u = svd.U val s = svd.s val v = svd.V println(s"$u $s $v") } }