zoukankan      html  css  js  c++  java
  • Python+Spark2.0+hadoop学习笔记——Spark ML Pipeline机器学习流程

    情况一:二元分类

    这部分使用的数据集是判断网页是暂时的还是长青的。因为涉及到了文本的信息,所以需要进行文本的数字化和向量化。

    在这部分中,机器学习分为三个部分,第一部分是建立机器学习流程pipeline,第二部分是训练,第三部分是预测。

    在建立机器学习流程pipeline中包含4个阶段,如下所示:

    StringIndexer:将文字的分类特征转换为数字。

    OneHotEncoder:将一个数字的分类特征字段转为多个字段。

    VectorAssembler:将所有的特征字段整合成一个Vector字段。

    DesionTreeClassifier:进行训练并且产生模型。

    训练过程是指“训练数据DataFrame”使用pipeline.fit()进行训练,然后产生pipelineModel模型。

    预测过程是指“新数据DataFrame”使用pipelineModel.transform()进行预测,预测完成后会产生“预测结果DataFrame”。

    这部分先使用DecisionTree Classifier Model进行预测,代码如下:

    global Path
    if sc.master[0:5]=="local":
    Path="file:/home/jorlinlee/pythonwork/PythonProject/"
    else:
    Path="hdfs://master:9000/user/jorlinlee"

    创建row_dr DataFrame

    row_df=sqlContext.read.format("csv").option("header","true").option("delimiter"," ").load(Path+"data/train.tsv")

    编写DataFrames UDF 用户自定义函数(将"?"转换为"0")

    from pyspark.sql.functions import udf

    def replace_question(x):

    return("0" if x=="?" else x)

    replace_question=udf(replace_question)

    将string数据类型转换为double数据类型

    from pyspark.sql.functions import col

    import pyspark.sql.types

    df=row_df.select(

    ['url','alchemy_category'] +

    [replace_question(col(colimn)).cast("double").alias(column)

    for column in row_df.columns[4:]])

    将数据分成train_df与test_df

    train_df,test_df=df.randomSplit([0.7,0.3])
    train_df.cache()
    test_df.cache()

    使用StringIndexer:将字符串分类特征字段转换为数值

    from pyspark.ml.feature import StringIndexer

    categoryIndexer=StringIndexer(inputCol='alchemy_category',outputCol="alchemy_category_Index")

    categoryTransformer=categoryIndexer.fit(df)

    df1=categoryTransformer.transform(train_df)

    使用OneHotEncoder:将一个数值的分类特征字段转换为多个字段的Vector

    from pyspark.ml.feature import OneHotEncoder

    encoder=OneHotEncoder(dropLast=false,inputCol='alchemy_category_Index',outputCol="alchemy_category_IndexVec")

    df2=encoder.transform(df1)

    使用VectorAssembler:将多个特征字段整合成一个特征的Vector

    from pyspark.ml.feature import VectorAssembler

    assemblerInputs=['alchemy_category_IndexVec'] + row_df.columns[4:-1]

    assembler=VectorAssembler(inputCols=assemblerInputs,outputCol="feature")

    df3=assembler.transform(df2)

    使用DecisionTreeClassifier二元分类

    from pyspark.ml.classification import DecisionTreeClassifier

    dt=DecisionTreeClassifier(labelCol="label",featuresCol="features",impurity="gini",maxDepth=10,maxBins=14)

    dt_model=dt.fit(df3)

    进行预测

    df4=dt_model.transform(df3)

    以上是分解过程,下面使用pipeline流程组件

    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer,OneHotEncoder,VectorAssembler
    from pyspark.ml.classification import DecisionTreeClassifier

    建立pipeline

    stringIndexer=StringIndexer(inputCol='alchemy_category',
    outputCol="alchemy_category_Index")
    encoder=OneHotEncoder(dropLast=False,
    inputCol='alchemy_category_Index',
    outputCol="alchemy_category_IndexVec")
    assemblerInputs=['alchemy_category_IndexVec']+row_df.columns[4:-1]
    assembler=VectorAssembler(inputCols=assemblerInputs,outputCol="feature")
    dt=DecisionTreeClassifier(labelCol="label",featuresCol="feature",impurity="gini",maxDepth=10,maxBins=14)
    pipeline=Pipeline(stages=[stringIndexer,encoder,assembler,dt])

    查看pipeline阶段

    pipeline.getStages()

    使用pipeline进行数据处理与训练

    pipelineModel=pipeline.fit(train_df)

    查看训练完成后的决策树模型(第3个阶段会产生决策树模型)

    pipelineModel.stages[3]

    使用pipelineModel.transform进行预测

    predicted=pipelineModel.transform(test_df)

    评估模型的准确性(使用AUC)

    from pyspark.ml.evaluation import BinaryClassificationEvaluator

    evaluator=BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction",
    labelCol="label",
    metricName="areaUnderROC")

    auc=evaluator.evaluate(predicted)

    auc

    结果是:0.617

    提出改进方案:

    方案一:

    使用TrainValidation进行训练验证找出最佳模型

    from pyspark.ml.tuning import ParamGridBuilder,TrainValidationSplit

    设置训练验证的参数

    paramGrid=ParamGridBuilder().addGrid(dt.impurity,["gini","entropy"]).addGrid(dt.maxDepth,[5,10,15]).addGrid(dt.maxBins,[10,15,20]).build()

    创建TrainValidationSplit

    tvs=TrainValidationSplit(estimator=dt,evaluator=evaluator,estimatorParamMaps=paramGrid,trainRatio=0.8)

    建立tvs_pipeline

    tvs_pipeline=Pipeline(stages=[stringIndexer,encoder,assembler,tvs])

    使用tvs_pipeline流程进行训练验证

    tvs_pipelineModel=tvs_pipeline.fit(train_df)

    评估最佳模型AUC

    predictions=tvs_pipelineModel.transform(test_df)

    auc=evaluator.evaluate(predictions)

    auc

    结果:0.656

    方案二:使用crossValidation交叉验证找出最佳模型

    from pyspark.ml.tuning import CrossValidator

    建立交叉验证的CrossValidator(与之前的paramGrid有联系)

    cv=CrossValidator(estimator=dt,evaluator=evaluator,estimatorParamMaps=paramGrid,numFolds=3)

    建立交叉验证的cv_pipeline

    cv_pipeline=Pipeline(stages=[stringIndexer,encoder,assembler,cv])

    训练模型

    cv_pipelineModel=cv_pipeline.fit(train_df)

    评估最佳模型AUC

    predictions=cv_pipelineModel.transform(test_df)

    auc=evaluator.evaluate(predictions)

    auc

    结果:0.658

    方案三:使用随机森林RandomForestClassifier分类器

    from pyspark.ml.classification import RandomForestClassifier

    建立随机森林分类模型

    rf=RandomForestClassifier(labelCol="label",featuresCol="feature",numTrees=10)

    建立随机森林分类pipeline

    rfpipeline=Pipeline(stages=[stringIndexer,encoder,assembler,rf])

    对随机森林模型进行训练

    rfpipelineModel=rfpipeline.fit(train_df)

    使用模型进行预测

    rfpredicted=rfpipelineModel.transform(test_df)

    auc=evaluator.evaluate(rfpredicted)

    auc

    结果:0.738

    使用RandomForestClassifier TrainValidation找出最佳模型

    from pyspark.ml.tuning import ParamGridBuilder,TrainValidationSplit

    paramGrid=ParamGridBuilder().addGrid(rf.impurity,['gini','entropy']).addGrid(rf.maxDepth,[5,10,15]).addGrid(rf.maxBins,[10,15,20]).addGrid(rf.numTrees,[10,20,30]).build()

    rftvs=TrainValidationSplit(estimator=rf,evaluator=evaluator,estimatorParamMaps=paramGrid,trainRatio=0.8)

    rftvs_pipeline=Pipeline(stages=[stringIndexer,encoder,assembler,rftvs])

    rftvs_pipelineModel=rftvs_pipeline.fit(train_df)

    rftvspredcitions=rftvs_pipelineMode.transform(test_df)

    auc=evaluator.evaluate(rftvspredictions)

    auc

    结果是:0.760

    使用crossValidation找出最佳模型

    from pyspark.ml.tuning import CrossValidator,ParamGridBuilder

    rfcv=CrossValidator(estimator=rf,evaluator=evaluator,estimatorParamMaps=paramGrid,numFolds=3)

    rfcv_pipeline=Pipeline(stages=[stringIndexer,encoder,assembler,rfcv])

    rfcv_pipelineModel=rfcv_pipeline.fit(train_df)

    rfcvpredictions=rfcv_pipelineModel.transform(test_df)

    auc=evaluator.evaluate(rfcvpredictions)

    auc

    结果:0.762

    情况二:多元分类

    这部分使用的数据是森林覆盖树种数据。数据中没有涉及到文本数据,因此在建立pipeline时使用VectorAssembler和相应的机器学习模型就好了。

    读取数据

    global path
    if sc.master[0:5]=="local":
    Path="file:/home/jorlinlee/pythonwork/PythonProject/"
    else:
    Path="hdfs://master:9000/user/jorlinlee/"

    rawData=sc.textFile(Path+"data/covtype.data")
    lines=rawData.map(lambda x:x.split(","))

    因为这份数据最后需要转换成DataFrame形式,但是这份数据没有字段名,因此需要人工增加

    from pyspark.sql.types import StringType, StructField, StructType

    fields=[StructField("f"+str(i), StringType(), True) for i in range(fieldnum)]

    schema=StructType(fields)

    构造DataFrame

    covtype_df=spark.createDataFrame(lines,schema)

    将string格式转换为double

    from pyspark.sql.functions import col
    covtype_df=covtype_df.select([col(column).cast("double").alias(column) for column in covtype_df.columns])

    创建特征字段List

    featuresCols=covtype_df.columns[:54]

    创建label字段

    covtype_df=covtype_df.withColumn("label",covtype_df["f54"] - 1).drop("f54")

    将数据分成train_df与test_df

    train_df,test_df=covtype_df.randomSplit([0.7,0.3])

    train_df.cache()

    test_df.cache()

    导入模块

    from pyspark.ml import Pipeline
    from pyspark.ml.feature import VectorAssembler
    from pyspark.ml.classification import DecisionTreeClassifier

    建立pipeline

    vectorAssembler=VectorAssembler(inputCols=featuresCols,outputCol="features")
    dt=DecisionTreeClassifier(labelCol="label",featuresCol="features",maxDepth=5,maxBins=20)
    dt_pipeline=Pipeline(stages=[vectorAssembler,dt])

    使用pipeline进行训练

    pipelineModel=dt_pipeline.fit(train_df)

    使用pipelinModel进行预测

    from pyspark.ml.evaluation import MulticlassClassificationEvaluator

    evaluator=MulticlassClassificationEvaluator(labelCol="label",predictionCol="prediction",metricName="accuracy")

    predicted=pipelineModel.transform(test_df)

    accuracy=evaluator.evaluate(predictions)

    accuracy

    结果:0.703

    使用TrainValidation进行训练验证找出最佳模型

    from pyspark.ml.tuning import ParamGridBuilder,TrainValidationSplit

    paramGrid=ParamGridBuilder().addGrid(dt.impurity,["gini","entropy"]).addGrid(dt.maxDepth,[10,15,25]).addGrid(dt.maxBins,[30,40,50]).build()

    tvs=TrainValidationSplit(estimator=dt,evaluator=evaluator,estimatorParamMaps=paramGrid,trainRatio=0.8)

    tvs_pipeline=Pipeline(stages=[vectorAssembler,tvs])

    tvs_pipelineModel=tvs_pipeline.fit(train_df)

    predictions=tvs_pipelineModel.transform(test_df)

    accuracy=evaluator.evaluate(predictions)
    accuracy

    结果:0.930

    情况三:回归分析

    这部分使用的是共享单车预测的数据。这部分在建立pipeline时使用VectorAssembler(将所有的特征字段整合成vector)、VectorIndexer(将不重复数值的数量小于等于maxCategories参数值所对应的字段视为分类字段,否则视为数值字段)和机器学习模型。

    代码如下:

    导入数据

    global Path
    if sc.master[0:5]=="local":
    Path="file:/home/jorlinlee/pythonwork/PythonProject/"
    else:
    Path="hdfs://master:9000/user/jorlinlee/"

    hour_df=spark.read.format('csv').option("header",'true').load(Path+"data/hour.csv")

    去掉不重要的字段

    hour_df=hour_df.drop("instant").drop("dteday").drop("yr").drop("casual").drop("registered")

    数据类型变换

    from pyspark.sql.functions import col

    hour_df=hour_df.select([col(column).cast("double").alias(column) for column in hour_df.columns])

    建立pipeline流程

    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer,VectorIndexer,VectorAssembler
    from pyspark.ml.regression import DecisionTreeRegressor

    featuresCols=hour_df.columns[:-1]

    vectorAssembler=VectorAssembler(inputCols=featuresCols,outputCol="aFeatures")vectorIndexer=VectorIndexer(inputCol="aFeatures",outputCol="features",maxCategories=24)

    dt=DecisionTreeRegressor(labelCol="cnt",featuresCol="features")

    dt_pipeline=Pipeline(stages=[vectorAssembler,vectorIndexer,dt])

    分割数据

    train_df,test_df=hour_df.randomSplit([0.7,0.3])

    train_df.cache()

    test_df.cache()

    使用pipeline进行数据处理与训练

    dt_pipelineModel=dt_pipeline.fit(train_df)

    predicted_df=dt_pipelineModel.transform(test_df)

    评估模型的准确率

    from pyspark.ml.evaluation import RegressionEvaluator

    evaluator=RegressionEvaluator(labelCol='cnt',predictionCol='prediction',metricName="rmse")

    predicted_df=dt_pipelineModel.transform(test_df)

    rmse=evaluator.evaluate(predicted_df)

    rmse

    结果:95.617

    使用TrainValidation进行训练验证找出最佳模型

    from pyspark.ml.tuning import ParamGridBuilder,TrainValidationSplit

    paramGrid=ParamGridBuilder().addGrid(dt.maxDepth,[5,10,15,25]).addGrid(dt.maxBins,[25,35,45,50]).build()

    tvs=TrainValidationSplit(estimator=dt,evaluator=evaluator,estimatorParamMaps=paramGrid,trainRatio=0.8)

    tvs_pipeline=Pipeline=Pipeline(stages=[vectorAssembler,vectorIndexer,tvs])

    tvs_pipelineModel=tvs_pipeline.fit(train_df)

    predictions=tvs_pipelineModel.transform(test_df)

    rmse=evaluator.evaluate(predictions)

    rmse

    结果:78.285

    使用crossValidation进行交叉验证找出最佳模型

    from pyspark.ml.tuning import CrossValidator
    from pyspark.ml import Pipeline

    cv=CrossValidator(estimator=dt,evaluator=evaluator,estimatorParamMaps=paramGrid,numFolds=3)

    cv_pipeline=Pipeline(stages=[vectorAssembler,vectorIndexer,cv])

    cv_pipelineModel=cv_pipeline.fit(train_df)

    predictions=cv_pipelineModel.transform(test_df)

    rmse=evaluator.evaluate(predictions)

    rmse

    结果:78.457

    使用GBT Regression(梯度提升树,一次只产生一棵决策树,再根据前一个决策树的结果决定如何产生下一个决策树)

    from pyspark.ml.regression import GBTRegressor

    gbt=GBTRegressor(labelCol='cnt',featuresCol='features')

    gbt_pipeline=Pipeline(stages=[vectorAssembler,vectorIndexer,gbt])

    gbt_pipelineModel=gbt_pipeline.fit(train_df)

    predicted_df=gbt_pipelineModel.transform(test_df)

    rmse=evaluator.evaluate(predicted_df)

    rmse

    结果:75.699

    使用GBT Regression CrossValidation找出最佳模型

    from pyspark.ml.tuning import CrossValidator,ParamGridBuilder
    from pyspark.ml.evaluation import RegressionEvaluator
    from pyspark.ml import Pipeline

    paramGrid=ParamGridBuilder().addGrid(gbt.maxDepth,[5,10]).addGrid(gbt.maxBins,[25,40]).addGrid(gbt.maxIter,[10,50]).build()

    cv=CrossValidator(estimator=gbt,evaluator=evaluator,estimatorParamMaps=paramGrid,numFolds=3)

    cv_pipeline=Pipeline(stages=[vectorAssembler,vectorIndexer,cv])

    cv_pipelineModel=cv_pipeline.fit(train_df)

    predicted_df=cv_pipelineModel.transform(test_df)

    evaluator=RegressionEvaluator(labelCol='cnt',predictionCol='prediction',metricName="rmse")

    rmse=evaluator.evaluate(predicted_df)

    rmse

    结果:70.732

  • 相关阅读:
    开源协议介绍
    Guice vs Dependency Injection By Hand
    Eclipse与MyEclipse的联系和区别
    Java Basic
    解决Windows Vista 英文版中文软件乱码
    [转]Java B/S开发模式漫谈
    什么是Groovy
    JBoss, Geronimo, Tomcat
    一个让你迅速理解Javabean的实例
    keepalive 原理讲解 salami
  • 原文地址:https://www.cnblogs.com/zhuozige/p/12668077.html
Copyright © 2011-2022 走看看