zoukankan      html  css  js  c++  java
  • 8.4 分类与回归

    一、逻辑斯蒂回归分类器

    逻辑斯蒂回归(logistic regression)是统计学习中的经典分类方法,属于对数线性模型。logistic回归的因变量可以是二分类的,也可以是多分类的。

    任务描述:以iris数据集(iris)为例进行分析(iris下载地址:http://dblab.xmu.edu.cn/blog/wp-content/uploads/2017/03/iris.txt)

    iris以鸢尾花的特征作为数据来源,数据集包含150个数据集,分为3类,每类50个数据,每个数据包含4个属性,是在数据挖掘、数据分类中非常常用的测试集、训练集。为了便于理解,这里主要用后两个属性(花瓣的长度和宽度)来进行分类。

    1.用二项逻辑斯蒂回归来解决二分类问题

    首先我们先取其中的后两类数据,用二项逻辑斯蒂回归进行二分类分析

    (1)导入需要的包

    import org.apache.spark.sql.Row
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.ml.linalg.{Vector,Vectors}
    import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
    import org.apache.spark.ml.{Pipeline,PipelineModel}
    import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer,HashingTF, Tokenizer}
    import org.apache.spark.ml.classification.LogisticRegression
    import org.apache.spark.ml.classification.LogisticRegressionModel
    import org.apache.spark.ml.classification.{BinaryLogisticRegressionSummary, LogisticRegression}
    import org.apache.spark.sql.functions;
    

    (2)读取数据,简要分析

    scala> import spark.implicits._
    import spark.implicits._
     
    scala> case class Iris(features: org.apache.spark.ml.linalg.Vector, label: String)
    defined class Iris
     
    scala> val data = spark.sparkContext.textFile("file:///usr/local/spark/iris.txt").map(_.split(",")).map(p => I
    ris(Vectors.dense(p(0).toDouble,p(1).toDouble,p(2).toDouble, p(3).toDouble), p(4
    ).toString())).toDF()
    data: org.apache.spark.sql.DataFrame = [features: vector, label: string]
    
    scala> data.show()
    +-----------------+-----------+
    | features| label|
    +-----------------+-----------+
    |[5.1,3.5,1.4,0.2]|Iris-setosa|
    |[4.9,3.0,1.4,0.2]|Iris-setosa|
    |[4.7,3.2,1.3,0.2]|Iris-setosa|
    |[4.6,3.1,1.5,0.2]|Iris-setosa|
    |[5.0,3.6,1.4,0.2]|Iris-setosa|
    |[5.4,3.9,1.7,0.4]|Iris-setosa|
    |[4.6,3.4,1.4,0.3]|Iris-setosa|
    |[5.0,3.4,1.5,0.2]|Iris-setosa|
    |[4.4,2.9,1.4,0.2]|Iris-setosa|
    |[4.9,3.1,1.5,0.1]|Iris-setosa|
    |[5.4,3.7,1.5,0.2]|Iris-setosa|
    |[4.8,3.4,1.6,0.2]|Iris-setosa|
    |[4.8,3.0,1.4,0.1]|Iris-setosa|
    |[4.3,3.0,1.1,0.1]|Iris-setosa|
    |[5.8,4.0,1.2,0.2]|Iris-setosa|
    |[5.7,4.4,1.5,0.4]|Iris-setosa|
    |[5.4,3.9,1.3,0.4]|Iris-setosa|
    |[5.1,3.5,1.4,0.3]|Iris-setosa|
    |[5.7,3.8,1.7,0.3]|Iris-setosa|
    |[5.1,3.8,1.5,0.3]|Iris-setosa|
    +-----------------+-----------+
    only showing top 20 rows
    

    因为我们现在处理的是2分类问题,所以我们不需要全部的3类数据,我们要从中选出两类的数据。

    首先把刚刚得到的数据注册成一个表iris,注册成这个表之后,我们就可以通过sql语句进行数据查询。  

    scala> data.createOrReplaceTempView("iris")
     
    scala> val df = spark.sql("select * from iris where label != 'Iris-setosa'")
    df: org.apache.spark.sql.DataFrame = [features: vector, label: string]
     
    scala> df.map(t => t(1)+":"+t(0)).collect().foreach(println)
    Iris-versicolor:[7.0,3.2,4.7,1.4]
    Iris-versicolor:[6.4,3.2,4.5,1.5]
    Iris-versicolor:[6.9,3.1,4.9,1.5]
    ……
    ……
    

    (3)构建ML的pipeline

    a.分别获取标签列和特征列,进行索引,并进行了重命名。

    scala> val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(df)
    labelIndexer: org.apache.spark.ml.feature.StringIndexerModel = strIdx_e53e67411169
    scala> val featureIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").fit(df)
    featureIndexer: org.apache.spark.ml.feature.VectorIndexerModel = vecIdx_53b988077b38
    

    b.接下来,我们把数据集随机分成训练集和测试集,其中训练集占70%  

    scala> val Array(trainingData, testData) = df.randomSplit(Array(0.7, 0.3))
    trainingData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, label: string]
    testData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, label: string]
    

    c.然后,我们设置logistic的参数,这里我们统一用setter的方法来设置,也可以用ParamMap来设置(具体的可以查看spark mllib的官网)。这里我们设置了循环次数为10次,正则化项为0.3等  

    scala> val lr = new LogisticRegression().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
    lr: org.apache.spark.ml.classification.LogisticRegression = logreg_692899496c23
    

    d.这里我们设置一个labelConverter,目的是把预测的类别重新转化成字符型的。  

    scala> val labelConverter = new IndexToString().setInputCol("prediction").setOut
    putCol("predictedLabel").setLabels(labelIndexer.labels)
    labelConverter: org.apache.spark.ml.feature.IndexToString = idxToStr_c204eafabf57
    

    e.构建pipeline,设置stage,然后调用fit()来训练模型  

    scala> val lrPipeline = new Pipeline().setStages(Array(labelIndexer, featureIndexer, lr, labelConverter))
    lrPipeline: org.apache.spark.ml.Pipeline = pipeline_eb1b201af1e0
     
    scala> val lrPipelineModel = lrPipeline.fit(trainingData)
    lrPipelineModel: org.apache.spark.ml.PipelineModel = pipeline_eb1b201af1e0
    

    f.pipeline本质上是一个Estimator,当pipeline调用fit()的时候就产生了一个PipelineModel,本质上是一个Transformer。然后这个PipelineModel就可以调用transform()来进行预测,生成一个新的DataFrame,即利用训练得到的模型对测试集进行验证。  

    scala> val lrPredictions = lrPipelineModel.transform(testData)
    lrPredictions: org.apache.spark.sql.DataFrame = [features: vector, label: string ... 6 more fields]
    

    g.最后我们可以输出预测的结果,其中select选择要输出的列,collect获取所有行的数据,用foreach把每行打印出来。其中打印出来的值依次分别代表该行数据的真实分类和特征值、预测属于不同分类的概率、预测的分类  

    scala> lrPredictions.select("predictedLabel", "label", "features", "probability").collect().foreach { case Row(predictedLabel: String, label: String, features: Vector, prob: Vector) => println(s"($label, $features) --> prob=$prob, predicted Label=$predictedLabel")}
    (Iris-virginica, [4.9,2.5,4.5,1.7]) --> prob=[0.4796551461409372,0.5203448538590628], predictedLabel=Iris-virginica
    (Iris-versicolor, [5.1,2.5,3.0,1.1]) --> prob=[0.5892626391059901,0.41073736089401], predictedLabel=Iris-versicolor
    (Iris-versicolor, [5.5,2.3,4.0,1.3]) --> prob=[0.5577310241453046,0.4422689758546954], predictedLabel=Iris-versicolor
    

    (4)模型评估  

    创建一个MulticlassClassificationEvaluator实例,用setter方法把预测分类的列名和真实分类的列名进行设置;然后计算预测准确率和错误率  

    scala> val evaluator = new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction")
    evaluator: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_a80353e4211d
     
    scala> val lrAccuracy = evaluator.evaluate(lrPredictions)
    lrAccuracy: Double = 1.0
     
    scala> println("Test Error = " + (1.0 - lrAccuracy))
    Test Error = 0.0
    

    接下来我们可以通过model来获取我们训练得到的逻辑斯蒂模型。前面已经说过model是一个PipelineModel,因此我们可以通过调用它的stages来获取模型,具体如下:  

    scala> val lrModel = lrPipelineModel.stages(2).asInstanceOf[LogisticRegressionModel]
    lrModel: org.apache.spark.ml.classification.LogisticRegressionModel = logreg_692899496c23
     
    scala> println("Coefficients: " + lrModel.coefficients+"Intercept: "+lrModel.intercept+"numClasses: "+lrModel.numClasses+"numFeatures: "+lrModel.numFeatures)
    Coefficients: [-0.0396171957643483,0.0,0.0,0.07240315639651046]Intercept: -0.23127346342015379numClasses: 2numFeatures: 4

    2.用多项逻辑斯蒂回归来解决二分类问题

    3.用多项逻辑斯蒂回归来解决多分类问题

    二、决策树分类器

      决策树(decision tree)是一种基本的分类与回归方法,这里主要介绍用于分类的决策树。决策树模式呈树形结构,其中每个内部节点表示一个属性上的测试,每个分支代表一个测试输出,每个叶节点代表一种类别。学习时利用训练数据,根据损失函数最小化的原则建立决策树模型;预测时,对新的数据,利用决策树模型进行分类

      决策树学习通常包括3个步骤:特征选择、决策树的生成和决策树的剪枝。

    1.特征选择

      特征选择在于选取对训练数据具有分类能力的特征,这样可以提高决策树学习的效率。通常特征选择的准则是信息增益(或信息增益比、基尼指数等),每次计算每个特征的信息增益,并比较它们的大小,选择信息增益最大(信息增益比最大、基尼指数最小)的特征

    2.决策树的生成 

    1. 从根结点开始,对结点计算所有可能的特征的信息增益,选择信息增益最大的特征作为结点的特征,由该特征的不同取值建立子结点,再对子结点递归地调用以上方法,构建决策树;直到所有特征的信息增均很小或没有特征可以选择为止,最后得到一个决策树。
    2. ​决策树需要有停止条件来终止其生长的过程。一般来说最低的条件是:当该节点下面的所有记录都属于同一类,或者当所有的记录属性都具有相同的值时。这两种条件是停止决策树的必要条件,也是最低的条件。在实际运用中一般希望决策树提前停止生长,限定叶节点包含的最低数据量,以防止由于过度生长造成的过拟合问题。

    3.决策树的剪枝

      决策树生成算法递归地产生决策树,直到不能继续下去为止。这样产生的树往往对训练数据的分类很准确,但对未知的测试数据的分类却没有那么准确,即出现过拟合现象。解决这个问题的办法是考虑决策树的复杂度,对已生成的决策树进行简化,这个过程称为剪枝。

      我们以iris数据集(iris)为例进行分析(iris下载地址:http://dblab.xmu.edu.cn/blog/wp-content/uploads/2017/03/iris.txt)iris以鸢尾花的特征作为数据来源,数据集包含150个数据集,分为3类,每类50个数据,每个数据包含4个属性,是在数据挖掘、数据分类中非常常用的测试集、训练集。

    (1)导入需要的包

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.ml.linalg.{Vector,Vectors}
    import org.apache.spark.ml.Pipeline
    import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
    

    (2)读取数据,简要分析  

    scala> import spark.implicits._
    import spark.implicits._
     
    scala> case class Iris(features: org.apache.spark.ml.linalg.Vector, label: String)
    defined class Iris
     
    scala> val data = spark.sparkContext.textFile("file:///usr/local/spark/iris.txt").map(_.split(",")).map(p => Iris(Vectors.dense(p(0).toDouble,p(1).toDouble,p(2).toDouble, p(3).toDouble),p(4).toString())).toDF()
    data: org.apache.spark.sql.DataFrame = [features: vector, label: string]
    
    scala> data.createOrReplaceTempView("iris")
     
    scala> val df = spark.sql("select * from iris")
    df: org.apache.spark.sql.DataFrame = [features: vector, label: string]
     
    scala> df.map(t => t(1)+":"+t(0)).collect().foreach(println)
    Iris-setosa:[5.1,3.5,1.4,0.2]
    Iris-setosa:[4.9,3.0,1.4,0.2]
    Iris-setosa:[4.7,3.2,1.3,0.2]
    Iris-setosa:[4.6,3.1,1.5,0.2]
    Iris-setosa:[5.0,3.6,1.4,0.2]
    Iris-setosa:[5.4,3.9,1.7,0.4]
    Iris-setosa:[4.6,3.4,1.4,0.3]
     
    ... ... 
    

    (3)进一步处理特征和标签,以及数据分组

    //分别获取标签列和特征列,进行索引,并进行了重命名。
    scala> val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol(
    "indexedLabel").fit(df)
    labelIndexer: org.apache.spark.ml.feature.StringIndexerModel = strIdx_107f7e530fa7
     
    scala> val featureIndexer = new VectorIndexer().setInputCol("features").setOutpu
    tCol("indexedFeatures").setMaxCategories(4).fit(df)
    featureIndexer: org.apache.spark.ml.feature.VectorIndexerModel = vecIdx_0649803dfa70
    
    //这里我们设置一个labelConverter,目的是把预测的类别重新转化成字符型的。
    scala> val labelConverter = new IndexToString().setInputCol("prediction").setOut
    putCol("predictedLabel").setLabels(labelIndexer.labels)
    labelConverter: org.apache.spark.ml.feature.IndexToString = idxToStr_046182b2e571
    //接下来,我们把数据集随机分成训练集和测试集,其中训练集占70%。
    scala> val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
    trainingData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, label: string]
    testData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, label: string]

    (4)构建决策树分类模型

    //导入所需要的包
    import org.apache.spark.ml.classification.DecisionTreeClassificationModel
    import org.apache.spark.ml.classification.DecisionTreeClassifier
    import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
    //训练决策树模型,这里我们可以通过setter的方法来设置决策树的参数,也可以用ParamMap来设置(具体的可以查看spark mllib的官网)。具体的可以设置的参数可以通过explainParams()来获取。
    scala> val dtClassifier = new DecisionTreeClassifier().setLabelCol("indexedLabel
    ").setFeaturesCol("indexedFeatures")
    dtClassifier: org.apache.spark.ml.classification.DecisionTreeClassifier = dtc_029ea28aceb1
    //在pipeline中进行设置
    scala> val pipelinedClassifier = new Pipeline().setStages(Array(labelIndexer, featureIndexer, dtClassifier, labelConverter))
    pipelinedClassifier: org.apache.spark.ml.Pipeline = pipeline_a254dfd6dfb9
    
    //训练决策树模型
    scala> val modelClassifier = pipelinedClassifier.fit(trainingData)
    modelClassifier: org.apache.spark.ml.PipelineModel = pipeline_a254dfd6dfb9
    //进行预测
    scala> val predictionsClassifier = modelClassifier.transform(testData)
    predictionsClassifier: org.apache.spark.sql.DataFrame = [features: vector, label: string ... 6 more fields]
    //查看部分预测的结果
    scala> predictionsClassifier.select("predictedLabel", "label", "features").show(20)
    +---------------+---------------+-----------------+
    | predictedLabel| label| features|
    +---------------+---------------+-----------------+
    | Iris-setosa| Iris-setosa|[4.4,2.9,1.4,0.2]|
    | Iris-setosa| Iris-setosa|[4.6,3.6,1.0,0.2]|
    | Iris-virginica|Iris-versicolor|[4.9,2.4,3.3,1.0]|
    | Iris-setosa| Iris-setosa|[4.9,3.1,1.5,0.1]|
    | Iris-setosa| Iris-setosa|[4.9,3.1,1.5,0.1]|
    

    (5)评估决策树分类模型 

    scala> val evaluatorClassifier = new MulticlassClassificationEvaluator().s
    etLabelCol("indexedLabel").setPredictionCol("prediction").setMetricName("accuracy")
    evaluatorClassifier: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_4abc19f3a54d
     
    scala> val accuracy = evaluatorClassifier.evaluate(predictionsClassifier)
    accuracy: Double = 0.8648648648648649
     
    scala> println("Test Error = " + (1.0 - accuracy))
    Test Error = 0.1351351351351351
     
    scala> val treeModelClassifier = modelClassifier.stages(2).asInstanceOf[De
    cisionTreeClassificationModel]
    treeModelClassifier: org.apache.spark.ml.classification.DecisionTreeClassificati
    onModel = DecisionTreeClassificationModel (uid=dtc_029ea28aceb1) of depth 5 with 13 nodes
    
    scala> println("Learned classification tree model:
    " + treeModelClassifier.toDebugString)
    Learned classification tree model:
    DecisionTreeClassificationModel (uid=dtc_029ea28aceb1) of depth 5 with 13 nodes
    If (feature 2 <= 1.9)
    Predict: 2.0
    Else (feature 2 > 1.9)
    If (feature 2 <= 4.7)
    If (feature 0 <= 4.9)
    Predict: 1.0
    Else (feature 0 > 4.9)
    Predict: 0.0
    Else (feature 2 > 4.7)
    If (feature 3 <= 1.6)
    If (feature 2 <= 4.8)
    Predict: 0.0
    Else (feature 2 > 4.8)
    If (feature 0 <= 6.0)
    Predict: 0.0
    Else (feature 0 > 6.0)
    Predict: 1.0
    Else (feature 3 > 1.6)
    Predict: 1.0
    

    (6)构建决策树回归模型  

    //导入所需要的包
    import org.apache.spark.ml.evaluation.RegressionEvaluator
    import org.apache.spark.ml.regression.DecisionTreeRegressionModel
    import org.apache.spark.ml.regression.DecisionTreeRegressor
    //训练决策树模型
    scala> val dtRegressor = new DecisionTreeRegressor().setLabelCol("indexedLabel")
    .setFeaturesCol("indexedFeatures")
    dtRegressor: org.apache.spark.ml.regression.DecisionTreeRegressor = dtr_358e08c37f0c
    //在pipeline中进行设置
    scala> val pipelineRegressor = new Pipeline().setStages(Array(labelIndexer, featureIndexer, dtRegressor, labelConverter))
    pipelineRegressor: org.apache.spark.ml.Pipeline = pipeline_ae699675d015
    
    //训练决策树模型
    scala> val modelRegressor = pipelineRegressor.fit(trainingData)
    modelRegressor: org.apache.spark.ml.PipelineModel = pipeline_ae699675d015
    //进行预测
    scala> val predictionsRegressor = modelRegressor.transform(testData)
    predictionsRegressor: org.apache.spark.sql.DataFrame = [features: vector, label: string ... 4 more fields]
    //查看部分预测结果
    scala> predictionsRegressor.select("predictedLabel", "label", "features").show(20)
    +---------------+---------------+-----------------+
    | predictedLabel| label| features|
    +---------------+---------------+-----------------+
    | Iris-setosa| Iris-setosa|[4.4,2.9,1.4,0.2]|
    | Iris-setosa| Iris-setosa|[4.6,3.6,1.0,0.2]|
    | Iris-virginica|Iris-versicolor|[4.9,2.4,3.3,1.0]|
    | Iris-setosa| Iris-setosa|[4.9,3.1,1.5,0.1]|
    | Iris-setosa| Iris-setosa|[4.9,3.1,1.5,0.1]|
    

    (7)评估决策树回归模型

    scala> val evaluatorRegressor = new RegressionEvaluator().setLabelCol("ind
    exedLabel").setPredictionCol("prediction").setMetricName("rmse")
    evaluatorRegressor: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_425d2aeea2dd
     
    scala> val rmse = evaluatorRegressor.evaluate(predictionsRegressor)
    rmse: Double = 0.3676073110469039
     
    scala> println("Root Mean Squared Error (RMSE) on test data = " + rmse)
    Root Mean Squared Error (RMSE) on test data = 0.3676073110469039
     
    scala> val treeModelRegressor = modelRegressor.stages(2).asInstanceOf[Deci
    sionTreeRegressionModel]
    treeModelRegressor: org.apache.spark.ml.regression.DecisionTreeRegressionModel =
    DecisionTreeRegressionModel (uid=dtr_358e08c37f0c) of depth 5 with 13 nodes
    
    scala> println("Learned regression tree model:
    " + treeModelRegressor.toDebugString)
    Learned regression tree model:
    DecisionTreeRegressionModel (uid=dtr_358e08c37f0c) of depth 5 with 13 nodes
    If (feature 2 <= 1.9)
    Predict: 2.0
    Else (feature 2 > 1.9)
    If (feature 2 <= 4.7)
    If (feature 0 <= 4.9)
    Predict: 1.0
    Else (feature 0 > 4.9)
    Predict: 0.0
    Else (feature 2 > 4.7)
    If (feature 3 <= 1.6)
    If (feature 2 <= 4.8)
    Predict: 0.0
    Else (feature 2 > 4.8)
    If (feature 0 <= 6.0)
    Predict: 0.5
    Else (feature 0 > 6.0)
    Predict: 1.0
    Else (feature 3 > 1.6)
    Predict: 1.0
    

    从上述结果可以看到模型的标准误差为 0.3676073110469039以及训练的决策树模型结构。

  • 相关阅读:
    数据库的ACID
    动态SQL (if , choose (when, otherwise) , trim (where, set) , set , foreach)
    接口 和xml 中的知识
    Mybatis简介 接口式编程
    批处理 编程式事务
    AOP实现日志打印 基于xml配置的AOP实现 切入点表达式
    1、使用注解配置bean @Controller @Service @Repository 2.基于xml的属性装配 3、context:include-filter指定扫描包时要包含的类 ,context:exclude-filter(不包含)
    数据库连接池简介,使用
    配置通过静态工厂方法创建的bean , 配置通过实例工厂方法创建的bean , 配置FactoryBean★
    SpringMVC 拦截器 异常
  • 原文地址:https://www.cnblogs.com/nxf-rabbit75/p/12046144.html
Copyright © 2011-2022 走看看