zoukankan      html  css  js  c++  java
  • Spark机器学习笔记一

    Spark机器学习库现支持两种接口的API:RDD-based和DataFrame-based,Spark官方网站上说,RDD-based APIs在2.0后进入维护模式,主要的机器学习API是spark-ml包中的DataFrame-based API,并将在3.0后完全移除RDD-based API。

    在学习了两周Spark MLlib后,准备转向DataFrame-based接口。由于现有的文档资料均是RDD-based接口,于是便去看了看Spark MLlib的源码。DataFrame-based API 包含在org.apache.spark.ml包中,其中主要的类结构如下:

    咱先看一个线性回归的例子examples/ml/LinearRegressionExample.scala,其首先定义了一个LinearRegression的对象:

    val lir = new LinearRegression()
              .setFeaturesCol("features")
              .setLabelCol("label")
              .setRegParam(params.regParam)
              .setElasticNetParam(params.elasticNetParam)
              .setMaxIter(params.maxIter)
              .setTol(params.tol)

    然后,调用fit方法训练数据,得到一个训练好的模型lirModel,它是一个LinearRegressionModel类的对象。

    val lirModel = lir.fit(training)

    现在,我们大概可以理清MLlib机器学习的流程,和很多单机机器学习库一样,先定义一个模型并设置好参数,然后训练数据,最后返回一个训练好了的模型。

    我们现在在源码中去查看LinearRegression和LinearRegressionModel,其类的依赖关系如下:

    LinearRegression是一个Predictor,LinearRegressionModel是一个Model,那么Predictor是学习算法,Model是训练得到的模型。除此之外,还有一类继承自Params的类,这是一个表示参数的类。Predictor 和Model 共享一套参数。

    现在用Spark MLlib来完成第一个机器学习例子,数据是我之前放在txt文件里的回归数据,一共550多万条,共13列,第一列是Label,后面是Features。分别演示两种接口,先用旧的接口:

    1.读取原始数据:

    scala> import org.apache.spark.mllib.linalg._
    import org.apache.spark.mllib.linalg._
    scala> import org.apache.spark.mllib.regression._
    import org.apache.spark.mllib.regression._      
    scala> val raw_data = sc.textFile("data/my/y_x.txt")
    raw_data: org.apache.spark.rdd.RDD[String] = data/my/y_x.txt MapPartitionsRDD[1] at textFile at <console>:30

    2.转换格式,RDD-based接口以LabeledPoint为输入数据的格式:

    scala> val data = raw_data.map{ line =>
         | val arr = line.split(' ').map(_.toDouble)
         | val label = arr.head
         | val features = Vectors.dense(arr.tail)| LabeledPoint(label,features)
         | }
    data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[2] at map at <console>:32

    3.划分train、test数据集:

    scala> val splits = data.randomSplit(Array(0.8, 0.2))
    splits: Array[org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint]] = Array(MapPartitionsRDD[3] at randomSplit at <console>:34, MapPartitionsRDD[4] at randomSplit at <console>:34)
    scala> val train_set = splits(0).cache
    train_set: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[3] at randomSplit at <console>:34
    scala> val test_set = splits(1).cache
    test_set: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[4] at randomSplit at <console>:34

    4.使用LinearRegressionWithSGD.train训练模型:

    scala> val lr = LinearRegressionWithSGD.train(train_set,100,0.0001)
    warning: there was one deprecation warning; re-run with -deprecation for details
    16/08/26 09:20:44 WARN Executor: 1 block locks were not released by TID = 0:
    [rdd_3_0]
    lr: org.apache.spark.mllib.regression.LinearRegressionModel = org.apache.spark.mllib.regression.LinearRegressionModel: intercept = 0.0, numFeatures = 12

    5.模型评估:

    scala> val pred_labels = test_set.map(lp => (lp.label, lr.predict(lp.features))) 
    pred_labels: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[17] at map at <console>:42
    scala> val mse = pred_labels.map{case (p,v) => math.pow(p-v,2)}.mean
    mse: Double = 0.05104150735910074

    再用新的接口:

    1.读取原始数据:

    scala> import org.apache.spark.ml.linalg._
    import org.apache.spark.ml.linalg._
    scala> import org.apache.spark.ml.regression._
    import org.apache.spark.ml.regression._
    scala> import org.apache.spark.sql._
    import org.apache.spark.sql._
    scala> val raw_data = spark.read.text("data/my/y_x.txt")
    raw_data: org.apache.spark.sql.DataFrame = [value: string]

    2.转换数据

    scala> val data = raw_data.rdd.map { case Row(line:String) => 
         | val arr = line.split(' ').map(_.toDouble)
         | val label = arr.head
         | val features = Vectors.dense(arr.tail)
         | (label,features)
         | }
    data: org.apache.spark.rdd.RDD[(Double, org.apache.spark.ml.linalg.Vector)] = MapPartitionsRDD[4] at map at <console>:34

    3.划分数据集

    scala> val splits = data.randomSplit(Array(0.8, 0.2))
    splits: Array[org.apache.spark.rdd.RDD[(Double, org.apache.spark.ml.linalg.Vector)]] = Array(MapPartitionsRDD[5] at randomSplit at <console>:36, MapPartitionsRDD[6] at randomSplit at <console>:36)
    scala> val train_set = splits(0).toDS.cache
    train_set: org.apache.spark.sql.Dataset[(Double, org.apache.spark.ml.linalg.Vector)] = [_1: double, _2: vector]
    scala> val test_set = splits(1).toDS.cache
    test_set: org.apache.spark.sql.Dataset[(Double, org.apache.spark.ml.linalg.Vector)] = [_1: double, _2: vector]

    4.创建LinearRegression对象,并设置模型参数。这里设置类LabelCol和FeaturesCol列,默认为“label”和“features”,而我们的数据是"_1"和”_2“。

    scala> val lir = new LinearRegression
    lir: org.apache.spark.ml.regression.LinearRegression = linReg_c4e70a01bcd3
    scala> lir.setFeaturesCol("_2")
    res0: org.apache.spark.ml.regression.LinearRegression = linReg_c4e70a01bcd3
    scala> lir.setLabelCol("_1")
    res1: org.apache.spark.ml.regression.LinearRegression = linReg_c4e70a01bcd3

    5.训练模型

    val model = lir.fit(train_set)
    16/08/26 09:45:16 WARN Executor: 1 block locks were not released by TID = 0:
    [rdd_9_0]
    16/08/26 09:45:16 WARN WeightedLeastSquares: regParam is zero, which might cause numerical instability and overfitting.
    model: org.apache.spark.ml.regression.LinearRegressionModel = linReg_c4e70a01bcd3

    6.模型评估

    scala> val res = model.transform(test_set)
    res: org.apache.spark.sql.DataFrame = [_1: double, _2: vector ... 1 more field]
    scala> import org.apache.spark.ml.evaluation._
    import org.apache.spark.ml.evaluation._
    scala> val eva = new RegressionEvaluator
    eva: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_8fc6cce63aa9
    scala> eva.setLabelCol("_1")
    res6: eva.type = regEval_8fc6cce63aa9
    scala> eva.setMetricName("mse")
    res7: eva.type = regEval_8fc6cce63aa9
    scala> eva.evaluate(res)
    res8: Double = 0.027933653533088666                 
  • 相关阅读:
    PythonStudy——数据类型总结 Data type summary
    PythonStudy——可变与不可变 Variable and immutable
    PythonStudy——列表操作 List operatio
    PythonStudy——列表的常用操作 List of common operations
    PythonStudy——列表类型 List type
    PythonStudy——字符串扩展方法 String extension method
    PythonStudy——字符串重要方法 String important method
    AWT,Swing,RCP 开发
    JQuery插件机制
    最新知识网站
  • 原文地址:https://www.cnblogs.com/waring/p/5808081.html
Copyright © 2011-2022 走看看