zoukankan      html  css  js  c++  java
  • Spark 机器学习

    将Mahout on Spark 中的机器学习算法和MLlib中支持的算法统计如下:

    主要针对MLlib进行总结

    分类与回归

    分类和回归是监督式学习;

    监督式学习是指使用有标签的数据(LabeledPoint)进行训练,得到模型后,使用测试数据预测结果。其中标签数据是指已知结果的特征数据。

    分类和回归的区别:预测结果的变量类型

      分类预测出来的变量是离散的(比如对邮件的分类,垃圾邮件和非垃圾邮件),对于二元分类的标签是0和1,对于多元分类标签范围是0~C-1,C表示类别数目;

      回归预测出来的变量是连续的(比如根据年龄和体重预测身高)

    线性回归

      线性回归是回归中最常用的方法之一,是指用特征的线性组合来预测输出值。

      线性回归算法可以使用的类有:

        LinearRegressionWithSGD
        RidgeRegressionWithSGD
        LassoWithSGD

        ridge regression 使用 L2 正规化;
        Lasso 使用 L1 正规化;

      参数:

        stepSize:梯度下降的步数

        numIterations:迭代次数

        设置intercept:是否给数据加上一个干扰特征或者偏差特征,一个始终值为1的特征,默认不增加false

      {stepSize: 1.0, numIterations: 100, miniBatchFraction: 1.0}

      模型的使用:

        1、对数据进行预测,使用model.predict()

        2、获取数据特征的权重model.weights()

      模型的评估:

        均方误差

    例子:

    import org.apache.spark.{SparkContext, SparkConf}
    import org.apache.spark.mllib.regression.LabeledPoint
    import org.apache.spark.mllib.regression.LinearRegressionModel
    import org.apache.spark.mllib.regression.LinearRegressionWithSGD
    import org.apache.spark.mllib.linalg.Vectors
    
    /**
      * Created by Edward on 2016/9/21.
      */
    object LinearRegression {
      def main(args: Array[String]) {
        val conf: SparkConf = new SparkConf().setAppName("LinearRegression").setMaster("local")
        val sc = new SparkContext(conf)
    
        // Load and parse the data
        val data = sc.textFile("data/mllib/ridge-data/lpsa.data")
        val parsedData = data.map { line =>
          val parts = line.split(',')
          LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
        }.cache()
    
        // Building the model
        val numIterations = 100
        val model = LinearRegressionWithSGD.train(parsedData, numIterations)
    //    var lr = new LinearRegressionWithSGD().setIntercept(true)
    //    val model = lr.run(parsedData)
    
        //获取特征权重,及干扰特征
        println("weights:%s, intercept:%s".format(model.weights,model.intercept))
    
        // Evaluate model on training examples and compute training error
        val valuesAndPreds = parsedData.map { point =>
          val prediction = model.predict(point.features)
          (point.label, prediction)
        }
    
        //计算 均方误差
        val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean()
        println("training Mean Squared Error = " + MSE)
    
        // Save and load model
        model.save(sc, "myModelPath")
        val sameModel = LinearRegressionModel.load(sc, "myModelPath")
    
    
      }
    }

    数据:

    -0.4307829,-1.63735562648104 -2.00621178480549 -1.86242597251066 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
    -0.1625189,-1.98898046126935 -0.722008756122123 -0.787896192088153 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
    -0.1625189,-1.57881887548545 -2.1887840293994 1.36116336875686 -1.02470580167082 -0.522940888712441 -0.863171185425945 0.342627053981254 -0.155348103855541
    -0.1625189,-2.16691708463163 -0.807993896938655 -0.787896192088153 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
    0.3715636,-0.507874475300631 -0.458834049396776 -0.250631301876899 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
    0.7654678,-2.03612849966376 -0.933954647105133 -1.86242597251066 -1.02470580167082 -0.522940888712441 -0.863171185425945 -1.04215728919298 -0.864466507337306
    ...

    数据第一列表示标签数据,也就是结果数据,其他列表示特征数据;

    预测就是再给一组特征数据,预测结果;

    结果:

    weights:[0.5808575763272221,0.18930001482946976,0.2803086929991066,0.1110834181777876,0.4010473965597895,-0.5603061626684255,-0.5804740464000981,0.8742741176970946], intercept:0.0
    training Mean Squared Error = 6.207597210613579

    逻辑回归 

    是一种二元分类方法,也是多类分类方法;

      逻辑回归可以使用的方法:

      LogisticRegressionWithLBFGS (建议使用这个)

      LogisticRegressionWithSGD

      参数:

      与线性回归类似

      模型的使用:

      1、对数据进行预测,使用model.predict()
      2、获取数据特征的权重model.weights()

      模型的评估:

      二元分类:AUC(Area Under roc Curve)

      

    import org.apache.spark.{SparkContext, SparkConf}
    
    import org.apache.spark.SparkContext
    import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionModel}
    import org.apache.spark.mllib.evaluation.{BinaryClassificationMetrics, MulticlassMetrics}
    import org.apache.spark.mllib.regression.LabeledPoint
    import org.apache.spark.mllib.util.MLUtils
    
    /**
      * Created by Edward on 2016/9/21.
      */
    object LogisticRegression {
      def main(args: Array[String]) {
        val conf: SparkConf = new SparkConf().setAppName("LogisticRegression").setMaster("local")
        val sc: SparkContext = new SparkContext(conf)
    
        // Load training data in LIBSVM format.
        val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
    
        // Split data into training (60%) and test (40%).
        val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
        val training = splits(0).cache()
        val test = splits(1)
    
        // Run training algorithm to build the model
        val model = new LogisticRegressionWithLBFGS()
          .setNumClasses(10)
          .run(training)
    
        model.setThreshold(0.8)
    
        // Compute raw scores on the test set.
        val predictionAndLabels = test.map { case LabeledPoint(label, features) =>
          val prediction = model.predict(features)
          (prediction, label)
        }
    
        //多元矩阵
        // Get evaluation metrics.
        //val metrics = new MulticlassMetrics(predictionAndLabels)
        //val precision = metrics.precision
        //println("Precision = " + precision)
    
    
        //二元矩阵
        val metrics = new BinaryClassificationMetrics(predictionAndLabels)
        //通过ROC对模型进行评估,值趋近于1 receiver operating characteristic (ROC), 接受者操作特征 曲线下面积
        val auROC: Double = metrics.areaUnderROC()
        println("Area under ROC = " + auROC)
        //通过PR对模型进行评估,值趋近于1 precision-recall (PR), 精确率
        val underPR: Double = metrics.areaUnderPR()
        println("Area under PR = " + underPR)
    
        // Save and load model
        model.save(sc, "myModelPath")
        val sameModel = LogisticRegressionModel.load(sc, "myModelPath")
    
      }
    
    }

    支持向量机 Support Vector Machines (SVMs) 

      分类算法,二元分类算法

      和逻辑回归二元分类相似

    import org.apache.spark.{SparkContext, SparkConf}
    import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD}
    import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
    import org.apache.spark.mllib.util.MLUtils
    
    /**
      * Created by Edward on 2016/9/21.
      */
    object SVMs {
      def main(args: Array[String]) {
    
        val conf: SparkConf = new SparkConf().setAppName("SVM").setMaster("local")
        val sc: SparkContext = new SparkContext(conf)
    
    
        // Load training data in LIBSVM format.
        val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
    
    
        // Split data into training (60%) and test (40%).
        val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
        val training = splits(0).cache()
        val test = splits(1)
    
        // Run training algorithm to build the model
        val numIterations = 100
        val model = SVMWithSGD.train(training, numIterations)
    
        // Clear the default threshold.
        model.clearThreshold()
    
        // Compute raw scores on the test set.
        val scoreAndLabels = test.map { point =>
          println("feature="+point.features)
          val score = model.predict(point.features)
          (score, point.label)
        }
    
        scoreAndLabels.foreach(println(_))
    
        // Get evaluation metrics.
        val metrics = new BinaryClassificationMetrics(scoreAndLabels)
    
        println("metrics="+metrics)
    
        val auROC = metrics.areaUnderROC()
    
        println("Area under ROC = " + auROC)
    
        // Save and load model
        model.save(sc, "myModelPath")
        val sameModel = SVMModel.load(sc, "myModelPath")
    
        sc.stop()
    
      }
    
    }

    数据:

    0 128:51 129:159 130:253 131:159 132:50 155:48 156:238 157:252 158:252 159:252 160:237 182:54 183:227 184:253 185:252 186:239 187:233 188:252 189:57 190:6 208:10 209:60 210:224
    1 159:124 160:253 161:255 162:63 186:96 187:244 188:251 189:253 190:62 214:127 215:251 216:251 217:253 218:62 
    ...

    协同过滤 Collaborative Filtering

      Spark中协同过滤算法主要由交替最小二乘法来实现 alternating least squares (ALS)

      参数:

      numBlocks block块的数量,用来控制并行度

      rank 特征向量的大小

      iterations 迭代数量

      lambda 正规化参数

      alpha 用来在隐式ALS中计算置信度的常量

      方法:

      ALS.train

      模型的评估:

      均方误差

    例子:

      

    import org.apache.spark.{SparkContext, SparkConf}
    import org.apache.spark.mllib.recommendation.ALS
    import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
    import org.apache.spark.mllib.recommendation.Rating
    
    /**
      * Created by Edward on 2016/9/22.
      */
    object CollaborativeALS {
      def main(args: Array[String]) {
    
    
        val conf: SparkConf = new SparkConf().setAppName("CollaborativeALS").setMaster("local")
        val sc: SparkContext = new SparkContext(conf)
        // Load and parse the data
        val data = sc.textFile("data/mllib/als/test.data")
        val ratings = data.map(_.split(',') match { case Array(user, item, rate) =>
          Rating(user.toInt, item.toInt, rate.toDouble)
        })
    
        // Build the recommendation model using ALS
        val rank = 10
        val numIterations = 10
        val model = ALS.train(ratings, rank, numIterations, 0.01)
    
        // Evaluate the model on rating data
        val usersProducts = ratings.map { case Rating(user, product, rate) =>
          (user, product)
        }
        val predictions =
          model.predict(usersProducts).map { case Rating(user, product, rate) =>
            ((user, product), rate)
          }
        val ratesAndPreds = ratings.map { case Rating(user, product, rate) =>
          ((user, product), rate)
        }.join(predictions)
        val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =>
          val err = (r1 - r2)
          err * err
        }.mean()
    
        //均方误差
        println("Mean Squared Error = " + MSE)
    
        // Save and load model
        model.save(sc, "target/tmp/myCollaborativeFilter")
        val sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
    
    
      }
    
    }

    持续更新中...

  • 相关阅读:
    CSRF 攻击
    PHP中HTTP_X_FORWARDED_FOR 和 REMOTE_ADDR的使用
    PHP PDO函数库详解
    STK基础
    opnet统计结果的采集模式——capture mode
    一个博客园代码高亮的方案
    博客园TinyMCE编辑器中插入的代码块使用HTML编辑器再编辑
    local statistics和global statistics的区别
    opnet的函数中FIN、FRET和FOUT
    福昕阅读器默认打开pdf文件视图大小
  • 原文地址:https://www.cnblogs.com/one--way/p/5902242.html
Copyright © 2011-2022 走看看