zoukankan      html  css  js  c++  java
  • Spark机器学习

    Spark机器学习

    1、介绍

    MLlib是spark机器学习库,目标是让机器学习易用使用并具有伸缩性。在更高层面上,提供如下工具:

    • ML算法

      常用算法,比如分类、回归、聚类和协同过滤。

    • 特征

      特征抽取,特征变换、降维以及选择。

    • 管线

      构造、计算以及调优管线的工具。

    • 持久化

      保存、加载算法、模型和管线。

    • 工具

      线性代数、统计学、数据处理等等。

    2、基础概念

    • vector

      向量,就是数学中的向量,表现为一维数组。向量分为sparse vector(松散向量)和dense vector(密度向量)。

      • sparse vector

        松散向量,只存放元素个数、非零元素的索引和值。

      • dense vector

        密度向量将每个元素的索引和值都进行存储。

    • LabelPoint

      内部含有double型的label和vector类型的features。

    • Rating

    • Model

    3、使用线性回归实现酒质量预测

    3.1 红酒数据说明

    FixedAcidity(固定酸度),VolatileAcidity(挥发酸),CitricAcid(柠檬酸),
    ResidualSugar(残渣糖),Chlorides(氯化物),FreeSulfurDioxide(游离二氧化硫),
    TotalSulfurDioxide(总二氧化硫), Density(密度), PH(pH值),Sulphates(硫酸盐), 
    Alcohol(酒精),Quality(质量)
    ---------------------------------------------------
    7.4;0.7;0;1.9;0.076;11;34;0.9978;3.51;0.56;9.4;5
    7.8;0.88;0;2.6;0.098;25;67;0.9968;3.2;0.68;9.8;5
    7.8;0.76;0.04;2.3;0.092;15;54;0.997;3.26;0.65;9.8;5
    11.2;0.28;0.56;1.9;0.075;17;60;0.998;3.16;0.58;9.8;6
    7.4;0.7;0;1.9;0.076;11;34;0.9978;3.51;0.56;9.4;5
    7.4;0.66;0;1.8;0.075;13;40;0.9978;3.51;0.56;9.4;5
    7.9;0.6;0.06;1.6;0.069;15;59;0.9964;3.3;0.46;9.4;5
    ...
    

    3.2 线性回归测质量

    /**
      * Created by Administrator on 2018/8/5.
      */
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.ml.regression.LinearRegression
    import org.apache.spark.ml.param.ParamMap
    import org.apache.spark.ml.linalg.{Vector, Vectors}
    import org.apache.spark.sql.{Row, SparkSession}
    
    object MLLRWhiteDemo {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setAppName("ml_linearRegress")
        conf.setMaster("local[*]")
    
        //创建SparkSession
        val spark = SparkSession.builder().config(conf).getOrCreate()
    
        //1.定义样例类
        case class Wine(FixedAcidity: Double,
                        VolatileAcidity: Double,
                        CitricAcid: Double,
                        ResidualSugar: Double,
                        Chlorides: Double, 
                        FreeSulfurDioxide: Double,
                        TotalSulfurDioxide: Double, 
                        Density: Double, 
                        PH: Double,
                        Sulphates: Double,
                        Alcohol: Double,
                        Quality: Double)
    
        //2.加载csv红酒文件,变换形成rdd
        val file = "file:///D:\ml\data\red.csv";
        val wineDataRDD = spark.sparkContext.textFile(file)
        .map(_.split(";"))
        .map(w => Wine(w(0).toDouble,
                       w(1).toDouble,
                       w(2).toDouble,
                       w(3).toDouble,
                       w(4).toDouble,
                       w(5).toDouble, 
                       w(6).toDouble, 
                       w(7).toDouble, 
                       w(8).toDouble,
                       w(9).toDouble,
                       w(10).toDouble,
                       w(11).toDouble))
    
        //导入sparksession的隐式转换对象的所有成员,才能将rdd转换成Dataframe
        import spark.implicits._
    
        //创建数据框,变换成(double , Vector)二元组
        //训练数据集
        val trainingDF = wineDataRDD.map(w => 
        (
          w.Quality, 
          Vectors.dense(
            w.FixedAcidity, 
            w.VolatileAcidity,
            w.CitricAcid, 
            w.ResidualSugar, 
            w.Chlorides, 
            w.FreeSulfurDioxide, 
            w.TotalSulfurDioxide,
            w.Density, 
            w.PH, 
            w.Sulphates, 
            w.Alcohol))
         )
        .toDF("label", "features")
        
        trainingDF.show(100, false)
    
        //3.创建线性回归对象
        val lr = new LinearRegression()
    
        //4.设置回归对象参数
        lr.setMaxIter(2)
        //
        //5.拟合模型,训练模型
        val model = lr.fit(trainingDF)
        //
        //6.构造测试数据集
        val testDF = spark.createDataFrame(Seq(
          (5.0, Vectors.dense(7.4, 0.7, 0.0, 1.9, 0.076, 25.0, 67.0, 
                              0.9968, 3.2, 0.68, 9.8)),
          (5.0, Vectors.dense(7.8, 0.88, 0.0, 2.6, 0.098, 11.0, 34.0, 
                              0.9978, 3.51, 0.56, 9.4)),
          (7.0, Vectors.dense(7.3, 0.65, 0.0, 1.2, 0.065, 15.0, 18.0, 
                              0.9968, 3.36, 0.57, 9.5))))
        .toDF("label", "features")
        //7.对测试数据集注册临时表
        testDF.createOrReplaceTempView("test")
        //8.对测试数据应用模型
        val result = model.transform(testDF)
        result.createOrReplaceTempView("_result")
    
        val rawWine = Seq(
          (0,Vectors.dense(7.4, 0.7, 0.0, 1.9, 0.076, 25.0, 67.0, 0.9968,
                           3.2, 0.68, 9.8)),
          (0,Vectors.dense(7.8, 0.88, 0.0, 2.6, 0.098, 11.0, 34.0, 0.9978, 
                           3.51, 0.56, 9.4)),
          (0,Vectors.dense(7.3, 0.65, 0.0, 1.2, 0.065, 15.0, 18.0, 0.9968, 
                           3.36, 0.57, 9.5))
        )
        //没有标签的数据
        val noLabelData = spark.createDataFrame(rawWine).toDF("label", "features")
        model.transform(noLabelData).show(20 , false )
      }
    }
    
    

    3.3 模型持久化与加载

    //保存模型
    model.save("file:///d:\ml\lr\")
    //加载模型
    val model = LinearRegressionModel.load("file:///d:\ml\lr")
    

    4、使用逻辑回归实现酒质量评定

    import org.apache.spark.SparkConf
    import org.apache.spark.ml.classification.LogisticRegression
    import org.apache.spark.ml.linalg.Vectors
    import org.apache.spark.sql.SparkSession
    
    /**
      * 对白酒使用逻辑回归进行评测酒的好坏
      */
    object MLLogicRegressWhiteWinDemo1 {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setAppName("ml_linearRegress")
        conf.setMaster("local[*]")
    
        //创建SparkSession
        val spark = SparkSession.builder().config(conf).getOrCreate()
    
        //加载白酒文件
        val file = "file:///d:\ml\data\white.csv"
        val rdd1 = spark.sparkContext.textFile(file)
        val rdd2 = rdd1.map(line=>{
          val arr = line.split(";")
          (
            if (arr(11).toDouble > 7) 1 else 0 ,
            Vectors.dense(
              arr(0).toDouble ,
              arr(1).toDouble ,
              arr(2).toDouble ,
              arr(3).toDouble ,
              arr(4).toDouble ,
              arr(5).toDouble ,
              arr(6).toDouble ,
              arr(7).toDouble ,
              arr(8).toDouble ,
              arr(9).toDouble ,
              arr(10).toDouble)
          )
        })
        import spark.implicits._
        val df = rdd2.toDF("label" , "features")
    
        //切割训练数据 , 0.8和0.2是切割权重
        val split = df.randomSplit(Array(0.8 , 0.2))
    
        //训练数据
        val trainData = split(0)
        //测试数据
        val testData = split(1)
    
        df.show(100 ,false)
    
        //创建逻辑回归
        val lr = new LogisticRegression()
        lr.setMaxIter(10).setRegParam(0.01)
    
        //拟合模型
        val model = lr.fit(trainData)
    
        val testResult = model.transform(testData)
        testResult.show(100, false)
      }
    }
    

    5、贝叶斯实现分类

    // $example on$
    import org.apache.spark.ml.classification.NaiveBayes
    import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
    // $example off$
    import org.apache.spark.sql.SparkSession
    
    object NaiveBayesExample {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
        .builder
        .master("local[*]")
        .appName("NaiveBayesExample")
        .getOrCreate()
    
        // $example on$
        // Load the data stored in LIBSVM format as a DataFrame.
        val file = "file:///D:\downloads\bigdata\spark-2.1.0-bin-hadoop2.7\data\mllib\sample_libsvm_data.txt"
        val data = spark.read.format("libsvm").load(file)
    
        //切割数据
        val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3), seed = 1234L)
    
        //训练模型
        val model = new NaiveBayes().fit(trainingData)
    
        //对测试数据应用变换
        val predictions = model.transform(testData)
        //显式预测结果
        predictions.show()
    
        val evaluator = new MulticlassClassificationEvaluator()
        .setLabelCol("label")
        .setPredictionCol("prediction")
        .setMetricName("accuracy")
        val accuracy = evaluator.evaluate(predictions)
        //显式精确度
        println("Test set accuracy = " + accuracy)
        // $example off$
    
        spark.stop()
      }
    }
    

    6、垃圾邮件过滤

    6.1 scala实现

    import org.apache.spark.mllib.regression.LabeledPoint
    import org.apache.spark.mllib.feature.HashingTF
    import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
    val spam = sc.textFile("spam.txt")
    val normal = sc.textFile("normal.txt")
    // 哈希词频,映射到10000维度的向量上
    val tf = new HashingTF(numFeatures = 10000)
    
    //垃圾邮件向量
    val spamFeatures = spam.map(email => tf.transform(email.split(" ")))
    //正常邮件向量
    val normalFeatures = normal.map(email => tf.transform(email.split(" ")))
    
    //正样本
    val positiveExamples = spamFeatures.map(features => LabeledPoint(1, features))
    //负样本
    val negativeExamples = normalFeatures.map(features => LabeledPoint(0, features))
    
    //合成训练数据
    val trainingData = positiveExamples.union(negativeExamples)
    trainingData.cache()
    // Run Logistic Regression using the SGD algorithm.
    
    val model = new LogisticRegressionWithSGD().run(trainingData)
    // Test on a positive example (spam) and a negative one (normal).
    val posTest = tf.transform(
    "O M G GET cheap stuff by sending money to ...".split(" "))
    val negTest = tf.transform(
    "Hi Dad, I started studying Spark the other ...".split(" "))
    println("Prediction for positive test example: " + model.predict(posTest))
    println("Prediction for negative test example: " + model.predict(negTest))
    

    6.2 Java实现

    import org.apache.spark.mllib.classification.LogisticRegressionModel;
    import org.apache.spark.mllib.classification.LogisticRegressionWithSGD;
    import org.apache.spark.mllib.feature.HashingTF;
    import org.apache.spark.mllib.linalg.Vector;
    import org.apache.spark.mllib.regression.LabeledPoint;
    JavaRDD<String> spam = sc.textFile("spam.txt");
    JavaRDD<String> normal = sc.textFile("normal.txt");
    
    
    final HashingTF tf = new HashingTF(10000);
    
    //
    JavaRDD<LabeledPoint> posExamples = spam.map(new Function<String, LabeledPoint>() {
    public LabeledPoint call(String email) {
    return new LabeledPoint(1, tf.transform(Arrays.asList(email.split(" "))));
    }
    });
    JavaRDD<LabeledPoint> negExamples = normal.map(new Function<String, LabeledPoint>() {
    public LabeledPoint call(String email) {
    return new LabeledPoint(0, tf.transform(Arrays.asList(email.split(" "))));
    }
    });
    JavaRDD<LabeledPoint> trainData = positiveExamples.union(negativeExamples);
    trainData.cache(); // Cache since Logistic Regression is an iterative algorithm.
    // Run Logistic Regression using the SGD algorithm.
    LogisticRegressionModel model = new LogisticRegressionWithSGD().run(trainData.rdd());
    // Test on a positive example (spam) and a negative one (normal).
    Vector posTest = tf.transform(
    Arrays.asList("O M G GET cheap stuff by sending money to ...".split(" ")));
    Vector negTest = tf.transform(
    Arrays.asList("Hi Dad, I started studying Spark the other ...".split(" ")));
    System.out.println("Prediction for positive example: " + model.predict(posTest));
    System.out.println("Prediction for negative example: " + model.predict(negTest));
    

    7、kmean聚类

    
    import org.apache.spark.ml.clustering.KMeans
    import org.apache.spark.sql.SparkSession
    
    /**
     * kmean聚类
      */
    object KMeansExample {
    
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
        .builder
        .appName("kmean")
        .getOrCreate()
    
        val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
    
        // 训练模型
        val kmeans = new KMeans().setK(2).setSeed(1L)
        val model = kmeans.fit(dataset)
    
        // 计算SSE(sum of square of error,误差平方和)
        val WSSSE = model.computeCost(dataset)
        println(s"Within Set Sum of Squared Errors = $WSSSE")
    
        // Shows the result.
        println("Cluster Centers: ")
        model.clusterCenters.foreach(println)
        spark.stop()
      }
    }
    
    

    8、分词

    8.1 英文分词

    import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
    import org.apache.spark.sql.functions._
    
    import org.apache.spark.sql.SparkSession
    
    object TokenizerExample {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder
          .appName("TokenizerExample")
          .getOrCreate()
    
        val sentenceDataFrame = spark.createDataFrame(Seq(
          (0, "Hi I heard about Spark"),
          (1, "I wish Java could use case classes"),
          (2, "Logistic,regression,models,are,neat")
        )).toDF("id", "sentence")
    
        val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
        val regexTokenizer = new RegexTokenizer()
          .setInputCol("sentence")
          .setOutputCol("words")
          .setPattern("\W") //
    
        val countTokens = udf { (words: Seq[String]) => words.length }
    
        val tokenized = tokenizer.transform(sentenceDataFrame)
        tokenized.select("sentence", "words")
            .withColumn("tokens", countTokens(col("words"))).show(false)
    
        val regexTokenized = regexTokenizer.transform(sentenceDataFrame)
        regexTokenized.select("sentence", "words")
            .withColumn("tokens", countTokens(col("words"))).show(false)
    
        spark.stop()
      }
    }
    
    

    8.2 中文word分词

    1. 引进maven依赖

      <dependency>
        <groupId>org.apdplat</groupId>
        <artifactId>word</artifactId>
        <version>1.3</version>
      </dependency>
      
    2. 使用API完成分词

      import org.apdplat.word.WordSegmenter;
      ...
      List<Word> words = WordSegmenter.seg("南京市的长江大桥是最长的大桥");
      

    3. 带停用词的分词

      List<Word> words = WordSegmenter.segWithStopWords("南京市的长江大桥是最长的大桥");
      

    4. 自定义停用词库

      1. 创建mystop.txt放到resource目录下

        [resources/mystop.txt]

        南京市
        大桥
        

      2. 编程指定停用词文件

        WordConfTools.set("stopwords.path", "classpath:mystop.txt");
        //更改词典路径之后,重新加载词典
        DictionaryFactory.reload();
        List<Word> words = WordSegmenter.seg("南京市的长江大桥是最长的大桥");
        

    9、TF-IDF示例

    import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
    import org.apache.spark.sql.SparkSession
    
    object TfIdfExample {
    
      def main(args: Array[String]) {
        val spark = SparkSession
          .builder
          .appName("TfIdfExample")
          .getOrCreate()
    
        val sentenceData = spark.createDataFrame(Seq(
          (0.0, "Hi I heard about Spark"),
          (0.0, "I wish Java could use case classes"),
          (1.0, "Logistic regression models are neat")
        )).toDF("label", "sentence")
    
        val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
        val wordsData = tokenizer.transform(sentenceData)
    
        val hashingTF = new HashingTF()
          .setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20)
    
        val featurizedData = hashingTF.transform(wordsData)
        val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
        /******************************************************
         *                                                    *
         *   注意:                   D + 1                     *
         *         IDF公式 = ln ------------------             *
         *                           N + 1                    *
         *                                                    *
         ******************************************************/
        val idfModel = idf.fit(featurizedData)
    
        val rescaledData = idfModel.transform(featurizedData)
        rescaledData.select("label", "features").show()
    
        spark.stop()
      }
    }
    

    注意:

    idf公式在很多场景是实现方式稍有不同,有些对10取对数,有些对e取对数,有些分母加1,spark的正是分子分母都加1.

    10、pipeline

    import org.apache.spark.ml.{Pipeline, PipelineModel}
    import org.apache.spark.ml.classification.LogisticRegression
    import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
    import org.apache.spark.ml.linalg.Vector
    import org.apache.spark.sql.Row
    
    import org.apache.spark.sql.SparkSession
    
    object PipelineExample {
    
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder
          .appName("PipelineExample")
          .getOrCreate()
    
        val training = spark.createDataFrame(Seq(
          (0L, "a b c d e spark", 1.0),
          (1L, "b d", 0.0),
          (2L, "spark f g h", 1.0),
          (3L, "hadoop mapreduce", 0.0)
        )).toDF("id", "text", "label")
    
        val tokenizer = new Tokenizer()
          .setInputCol("text")
          .setOutputCol("words")
        val hashingTF = new HashingTF()
          .setNumFeatures(1000)
          .setInputCol(tokenizer.getOutputCol)
          .setOutputCol("features")
        val lr = new LogisticRegression()
          .setMaxIter(10)
          .setRegParam(0.001)
        val pipeline = new Pipeline()
          .setStages(Array(tokenizer, hashingTF, lr))
    
        val model = pipeline.fit(training)
    
        model.write.overwrite().save("/tmp/spark-logistic-regression-model")
    
        pipeline.write.overwrite().save("/tmp/unfit-lr-model")
    
        val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")
    
        val test = spark.createDataFrame(Seq(
          (4L, "spark i j k"),
          (5L, "l m n"),
          (6L, "spark hadoop spark"),
          (7L, "apache hadoop")
        )).toDF("id", "text")
    
        // Make predictions on test documents                                                                                      .
        model.transform(test)
          .select("id", "text", "probability", "prediction")
          .collect()
          .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
            println(s"($id, $text) --> prob=$prob, prediction=$prediction")
          }
        spark.stop()
      }
    }
    
  • 相关阅读:
    Django时区导致的datetime时间比较报错
    Django 插件之 Xadmin实现富文本编辑器
    xadmin自定义菜单、增加功能、富文本编辑器
    virtualenv虚拟环境搭建及pipreqs自动生成第三方模块简介
    Selenium 多窗口元素定位处理
    selenium 消息框元素定位处理
    无界面运行Jmeter压测脚本
    网站死链的扫描
    Jmeter关联处理
    逻辑思维训练
  • 原文地址:https://www.cnblogs.com/xupccc/p/9544638.html
Copyright © 2011-2022 走看看