Spark机器学习
1、介绍
MLlib是spark机器学习库,目标是让机器学习易用使用并具有伸缩性。在更高层面上,提供如下工具:
-
ML算法
常用算法,比如分类、回归、聚类和协同过滤。
-
特征
特征抽取,特征变换、降维以及选择。
-
管线
构造、计算以及调优管线的工具。
-
持久化
保存、加载算法、模型和管线。
-
工具
线性代数、统计学、数据处理等等。
2、基础概念
-
vector
向量,就是数学中的向量,表现为一维数组。向量分为sparse vector(松散向量)和dense vector(密度向量)。
-
sparse vector
松散向量,只存放元素个数、非零元素的索引和值。
-
dense vector
密度向量将每个元素的索引和值都进行存储。
-
-
LabelPoint
内部含有double型的label和vector类型的features。
-
Rating
-
Model
3、使用线性回归实现酒质量预测
3.1 红酒数据说明
FixedAcidity(固定酸度),VolatileAcidity(挥发酸),CitricAcid(柠檬酸),
ResidualSugar(残渣糖),Chlorides(氯化物),FreeSulfurDioxide(游离二氧化硫),
TotalSulfurDioxide(总二氧化硫), Density(密度), PH(pH值),Sulphates(硫酸盐),
Alcohol(酒精),Quality(质量)
---------------------------------------------------
7.4;0.7;0;1.9;0.076;11;34;0.9978;3.51;0.56;9.4;5
7.8;0.88;0;2.6;0.098;25;67;0.9968;3.2;0.68;9.8;5
7.8;0.76;0.04;2.3;0.092;15;54;0.997;3.26;0.65;9.8;5
11.2;0.28;0.56;1.9;0.075;17;60;0.998;3.16;0.58;9.8;6
7.4;0.7;0;1.9;0.076;11;34;0.9978;3.51;0.56;9.4;5
7.4;0.66;0;1.8;0.075;13;40;0.9978;3.51;0.56;9.4;5
7.9;0.6;0.06;1.6;0.069;15;59;0.9964;3.3;0.46;9.4;5
...
3.2 线性回归测质量
/**
* Created by Administrator on 2018/8/5.
*/
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.sql.{Row, SparkSession}
object MLLRWhiteDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("ml_linearRegress")
conf.setMaster("local[*]")
//创建SparkSession
val spark = SparkSession.builder().config(conf).getOrCreate()
//1.定义样例类
case class Wine(FixedAcidity: Double,
VolatileAcidity: Double,
CitricAcid: Double,
ResidualSugar: Double,
Chlorides: Double,
FreeSulfurDioxide: Double,
TotalSulfurDioxide: Double,
Density: Double,
PH: Double,
Sulphates: Double,
Alcohol: Double,
Quality: Double)
//2.加载csv红酒文件,变换形成rdd
val file = "file:///D:\ml\data\red.csv";
val wineDataRDD = spark.sparkContext.textFile(file)
.map(_.split(";"))
.map(w => Wine(w(0).toDouble,
w(1).toDouble,
w(2).toDouble,
w(3).toDouble,
w(4).toDouble,
w(5).toDouble,
w(6).toDouble,
w(7).toDouble,
w(8).toDouble,
w(9).toDouble,
w(10).toDouble,
w(11).toDouble))
//导入sparksession的隐式转换对象的所有成员,才能将rdd转换成Dataframe
import spark.implicits._
//创建数据框,变换成(double , Vector)二元组
//训练数据集
val trainingDF = wineDataRDD.map(w =>
(
w.Quality,
Vectors.dense(
w.FixedAcidity,
w.VolatileAcidity,
w.CitricAcid,
w.ResidualSugar,
w.Chlorides,
w.FreeSulfurDioxide,
w.TotalSulfurDioxide,
w.Density,
w.PH,
w.Sulphates,
w.Alcohol))
)
.toDF("label", "features")
trainingDF.show(100, false)
//3.创建线性回归对象
val lr = new LinearRegression()
//4.设置回归对象参数
lr.setMaxIter(2)
//
//5.拟合模型,训练模型
val model = lr.fit(trainingDF)
//
//6.构造测试数据集
val testDF = spark.createDataFrame(Seq(
(5.0, Vectors.dense(7.4, 0.7, 0.0, 1.9, 0.076, 25.0, 67.0,
0.9968, 3.2, 0.68, 9.8)),
(5.0, Vectors.dense(7.8, 0.88, 0.0, 2.6, 0.098, 11.0, 34.0,
0.9978, 3.51, 0.56, 9.4)),
(7.0, Vectors.dense(7.3, 0.65, 0.0, 1.2, 0.065, 15.0, 18.0,
0.9968, 3.36, 0.57, 9.5))))
.toDF("label", "features")
//7.对测试数据集注册临时表
testDF.createOrReplaceTempView("test")
//8.对测试数据应用模型
val result = model.transform(testDF)
result.createOrReplaceTempView("_result")
val rawWine = Seq(
(0,Vectors.dense(7.4, 0.7, 0.0, 1.9, 0.076, 25.0, 67.0, 0.9968,
3.2, 0.68, 9.8)),
(0,Vectors.dense(7.8, 0.88, 0.0, 2.6, 0.098, 11.0, 34.0, 0.9978,
3.51, 0.56, 9.4)),
(0,Vectors.dense(7.3, 0.65, 0.0, 1.2, 0.065, 15.0, 18.0, 0.9968,
3.36, 0.57, 9.5))
)
//没有标签的数据
val noLabelData = spark.createDataFrame(rawWine).toDF("label", "features")
model.transform(noLabelData).show(20 , false )
}
}
3.3 模型持久化与加载
//保存模型
model.save("file:///d:\ml\lr\")
//加载模型
val model = LinearRegressionModel.load("file:///d:\ml\lr")
4、使用逻辑回归实现酒质量评定
import org.apache.spark.SparkConf
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
/**
* 对白酒使用逻辑回归进行评测酒的好坏
*/
object MLLogicRegressWhiteWinDemo1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("ml_linearRegress")
conf.setMaster("local[*]")
//创建SparkSession
val spark = SparkSession.builder().config(conf).getOrCreate()
//加载白酒文件
val file = "file:///d:\ml\data\white.csv"
val rdd1 = spark.sparkContext.textFile(file)
val rdd2 = rdd1.map(line=>{
val arr = line.split(";")
(
if (arr(11).toDouble > 7) 1 else 0 ,
Vectors.dense(
arr(0).toDouble ,
arr(1).toDouble ,
arr(2).toDouble ,
arr(3).toDouble ,
arr(4).toDouble ,
arr(5).toDouble ,
arr(6).toDouble ,
arr(7).toDouble ,
arr(8).toDouble ,
arr(9).toDouble ,
arr(10).toDouble)
)
})
import spark.implicits._
val df = rdd2.toDF("label" , "features")
//切割训练数据 , 0.8和0.2是切割权重
val split = df.randomSplit(Array(0.8 , 0.2))
//训练数据
val trainData = split(0)
//测试数据
val testData = split(1)
df.show(100 ,false)
//创建逻辑回归
val lr = new LogisticRegression()
lr.setMaxIter(10).setRegParam(0.01)
//拟合模型
val model = lr.fit(trainData)
val testResult = model.transform(testData)
testResult.show(100, false)
}
}
5、贝叶斯实现分类
// $example on$
import org.apache.spark.ml.classification.NaiveBayes
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
// $example off$
import org.apache.spark.sql.SparkSession
object NaiveBayesExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.master("local[*]")
.appName("NaiveBayesExample")
.getOrCreate()
// $example on$
// Load the data stored in LIBSVM format as a DataFrame.
val file = "file:///D:\downloads\bigdata\spark-2.1.0-bin-hadoop2.7\data\mllib\sample_libsvm_data.txt"
val data = spark.read.format("libsvm").load(file)
//切割数据
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3), seed = 1234L)
//训练模型
val model = new NaiveBayes().fit(trainingData)
//对测试数据应用变换
val predictions = model.transform(testData)
//显式预测结果
predictions.show()
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
//显式精确度
println("Test set accuracy = " + accuracy)
// $example off$
spark.stop()
}
}
6、垃圾邮件过滤
6.1 scala实现
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
val spam = sc.textFile("spam.txt")
val normal = sc.textFile("normal.txt")
// 哈希词频,映射到10000维度的向量上
val tf = new HashingTF(numFeatures = 10000)
//垃圾邮件向量
val spamFeatures = spam.map(email => tf.transform(email.split(" ")))
//正常邮件向量
val normalFeatures = normal.map(email => tf.transform(email.split(" ")))
//正样本
val positiveExamples = spamFeatures.map(features => LabeledPoint(1, features))
//负样本
val negativeExamples = normalFeatures.map(features => LabeledPoint(0, features))
//合成训练数据
val trainingData = positiveExamples.union(negativeExamples)
trainingData.cache()
// Run Logistic Regression using the SGD algorithm.
val model = new LogisticRegressionWithSGD().run(trainingData)
// Test on a positive example (spam) and a negative one (normal).
val posTest = tf.transform(
"O M G GET cheap stuff by sending money to ...".split(" "))
val negTest = tf.transform(
"Hi Dad, I started studying Spark the other ...".split(" "))
println("Prediction for positive test example: " + model.predict(posTest))
println("Prediction for negative test example: " + model.predict(negTest))
6.2 Java实现
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD;
import org.apache.spark.mllib.feature.HashingTF;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.regression.LabeledPoint;
JavaRDD<String> spam = sc.textFile("spam.txt");
JavaRDD<String> normal = sc.textFile("normal.txt");
final HashingTF tf = new HashingTF(10000);
//
JavaRDD<LabeledPoint> posExamples = spam.map(new Function<String, LabeledPoint>() {
public LabeledPoint call(String email) {
return new LabeledPoint(1, tf.transform(Arrays.asList(email.split(" "))));
}
});
JavaRDD<LabeledPoint> negExamples = normal.map(new Function<String, LabeledPoint>() {
public LabeledPoint call(String email) {
return new LabeledPoint(0, tf.transform(Arrays.asList(email.split(" "))));
}
});
JavaRDD<LabeledPoint> trainData = positiveExamples.union(negativeExamples);
trainData.cache(); // Cache since Logistic Regression is an iterative algorithm.
// Run Logistic Regression using the SGD algorithm.
LogisticRegressionModel model = new LogisticRegressionWithSGD().run(trainData.rdd());
// Test on a positive example (spam) and a negative one (normal).
Vector posTest = tf.transform(
Arrays.asList("O M G GET cheap stuff by sending money to ...".split(" ")));
Vector negTest = tf.transform(
Arrays.asList("Hi Dad, I started studying Spark the other ...".split(" ")));
System.out.println("Prediction for positive example: " + model.predict(posTest));
System.out.println("Prediction for negative example: " + model.predict(negTest));
7、kmean聚类
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.sql.SparkSession
/**
* kmean聚类
*/
object KMeansExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("kmean")
.getOrCreate()
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
// 训练模型
val kmeans = new KMeans().setK(2).setSeed(1L)
val model = kmeans.fit(dataset)
// 计算SSE(sum of square of error,误差平方和)
val WSSSE = model.computeCost(dataset)
println(s"Within Set Sum of Squared Errors = $WSSSE")
// Shows the result.
println("Cluster Centers: ")
model.clusterCenters.foreach(println)
spark.stop()
}
}
8、分词
8.1 英文分词
import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
object TokenizerExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("TokenizerExample")
.getOrCreate()
val sentenceDataFrame = spark.createDataFrame(Seq(
(0, "Hi I heard about Spark"),
(1, "I wish Java could use case classes"),
(2, "Logistic,regression,models,are,neat")
)).toDF("id", "sentence")
val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val regexTokenizer = new RegexTokenizer()
.setInputCol("sentence")
.setOutputCol("words")
.setPattern("\W") //
val countTokens = udf { (words: Seq[String]) => words.length }
val tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")
.withColumn("tokens", countTokens(col("words"))).show(false)
val regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words")
.withColumn("tokens", countTokens(col("words"))).show(false)
spark.stop()
}
}
8.2 中文word分词
-
引进maven依赖
<dependency> <groupId>org.apdplat</groupId> <artifactId>word</artifactId> <version>1.3</version> </dependency>
-
使用API完成分词
import org.apdplat.word.WordSegmenter; ... List<Word> words = WordSegmenter.seg("南京市的长江大桥是最长的大桥");
-
带停用词的分词
List<Word> words = WordSegmenter.segWithStopWords("南京市的长江大桥是最长的大桥");
-
自定义停用词库
-
创建mystop.txt放到resource目录下
[resources/mystop.txt]
南京市 大桥
-
编程指定停用词文件
WordConfTools.set("stopwords.path", "classpath:mystop.txt"); //更改词典路径之后,重新加载词典 DictionaryFactory.reload(); List<Word> words = WordSegmenter.seg("南京市的长江大桥是最长的大桥");
-
9、TF-IDF示例
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.sql.SparkSession
object TfIdfExample {
def main(args: Array[String]) {
val spark = SparkSession
.builder
.appName("TfIdfExample")
.getOrCreate()
val sentenceData = spark.createDataFrame(Seq(
(0.0, "Hi I heard about Spark"),
(0.0, "I wish Java could use case classes"),
(1.0, "Logistic regression models are neat")
)).toDF("label", "sentence")
val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val wordsData = tokenizer.transform(sentenceData)
val hashingTF = new HashingTF()
.setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20)
val featurizedData = hashingTF.transform(wordsData)
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
/******************************************************
* *
* 注意: D + 1 *
* IDF公式 = ln ------------------ *
* N + 1 *
* *
******************************************************/
val idfModel = idf.fit(featurizedData)
val rescaledData = idfModel.transform(featurizedData)
rescaledData.select("label", "features").show()
spark.stop()
}
}
注意:
idf公式在很多场景是实现方式稍有不同,有些对10取对数,有些对e取对数,有些分母加1,spark的正是分子分母都加1.
10、pipeline
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
object PipelineExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("PipelineExample")
.getOrCreate()
val training = spark.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
val model = pipeline.fit(training)
model.write.overwrite().save("/tmp/spark-logistic-regression-model")
pipeline.write.overwrite().save("/tmp/unfit-lr-model")
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")
val test = spark.createDataFrame(Seq(
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "spark hadoop spark"),
(7L, "apache hadoop")
)).toDF("id", "text")
// Make predictions on test documents .
model.transform(test)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}
spark.stop()
}
}