zoukankan      html  css  js  c++  java
  • 使用spark ml pipeline进行机器学习

    一、关于spark ml pipeline与机器学习

    一个典型的机器学习构建包含若干个过程
    1、源数据ETL
    2、数据预处理
    3、特征选取
    4、模型训练与验证
    以上四个步骤可以抽象为一个包括多个步骤的流水线式工作,从数据收集开始至输出我们需要的最终结果。因此,对以上多个步骤、进行抽象建模,简化为流水线式工作流程则存在着可行性,对利用spark进行机器学习的用户来说,流水线式机器学习比单个步骤独立建模更加高效、易用。
     scikit-learn 项目的启发,并且总结了MLlib在处理复杂机器学习问题的弊端(主要为工作繁杂,流程不清晰),旨在向用户提供基于DataFrame 之上的更加高层次的 API 库,以更加方便的构建复杂的机器学习工作流式应用。一个pipeline 在结构上会包含一个或多个Stage,每一个 Stage 都会完成一个任务,如数据集处理转化,模型训练,参数设置或数据预测等,这样的Stage 在 ML 里按照处理问题类型的不同都有相应的定义和实现。两个主要的stageTransformerEstimatorTransformer主要是用来操作一个DataFrame 数据并生成另外一个DataFrame 数据,比如svm模型、一个特征提取工具,都可以抽象为一个TransformerEstimator 则主要是用来做模型拟合用的,用来生成一个Transformer。可能这样说比较难以理解,下面就以一个完整的机器学习案例来说明spark ml pipeline是怎么构建机器学习工作流的。

    二、使用spark ml pipeline构建机器学习工作流

    在此以Kaggle数据竞赛Display Advertising Challenge的数据集(该数据集为利用用户特征进行广告点击预测)开始,利用spark ml pipeline构建一个完整的机器学习工作流程。
    Display Advertising Challenge的这份数据本身就不多做介绍了,主要包括3部分,numerical型特征集、Categorical类型特征集、类标签。
    首先,读入样本集,并将样本集划分为训练集与测试集:
           //使用file标记文件路径,允许spark读取本地文件
            String fileReadPath = "file:\D:\dac_sample\dac_sample.txt";
            //使用textFile读入数据
            SparkContext sc = Contexts.sparkContext;
            RDD<String> file = sc.textFile(fileReadPath,1);
            JavaRDD<String> sparkContent = file.toJavaRDD();
            JavaRDD<Row> sampleRow = sparkContent.map(new Function<String, Row>() {
                public Row call(String string) {
                    String tempStr = string.replace("	",",");
                    String[] features = tempStr.split(",");
                    int intLable= Integer.parseInt(features[0]);
                    String intFeature1  = features[1];
                    String intFeature2  = features[2];                String CatFeature1 = features[14];
                    String CatFeature2 = features[15];
                    return RowFactory.create(intLable, intFeature1, intFeature2, CatFeature1, CatFeature2);
                }
            });
    
    
            double[] weights = {0.8, 0.2};
            Long seed = 42L;
            JavaRDD<Row>[] sampleRows = sampleRow.randomSplit(weights,seed);
    
    
    
    
    
    得到样本集后,构建出
     DataFrame格式的数据供spark ml pipeline使用:
            List<StructField> fields = new ArrayList<StructField>();
            fields.add(DataTypes.createStructField("lable", DataTypes.IntegerType, false));
            fields.add(DataTypes.createStructField("intFeature1", DataTypes.StringType, true));
            fields.add(DataTypes.createStructField("intFeature2", DataTypes.StringType, true));
            fields.add(DataTypes.createStructField("CatFeature1", DataTypes.StringType, true));
            fields.add(DataTypes.createStructField("CatFeature2", DataTypes.StringType, true));
            //and so on
    
    
            StructType schema = DataTypes.createStructType(fields);
            DataFrame dfTrain = Contexts.hiveContext.createDataFrame(sampleRows[0], schema);//训练数据
            dfTrain.registerTempTable("tmpTable1");
            DataFrame dfTest = Contexts.hiveContext.createDataFrame(sampleRows[1], schema);//测试数据
            dfTest.registerTempTable("tmpTable2");
    
    
    由于在dfTraindfTest中所有的特征目前都为string类型,而机器学习则要求其特征为numerical类型,在此需要对特征做转换,包括类型转换和缺失值的处理。
    首先,将intFeaturestring转为doublecast()方法将表中指定列string类型转换为double类型,并生成新列并命名为intFeature1Temp
    之后,需要删除原来的数据列 并将新列重命名为intFeature1,这样,就将string类型的特征转换得到double类型的特征了。
            //Cast integer features from String to Double
           dfTest = dfTest.withColumn("intFeature1Temp",dfTest.col("intFeature1").cast("double"));
           dfTest = dfTest.drop("intFeature1").withColumnRenamed("intFeature1Temp","intFeature1");
    如果intFeature特征是年龄或者特征等类型,则需要进行分箱操作,将一个特征按照指定范围进行划分:
            /*特征转换,部分特征需要进行分箱,比如年龄,进行分段成成年未成年等 */
            double[] splitV = {0.0,16.0,Double.MAX_VALUE};
            Bucketizer bucketizer = new Bucketizer().setInputCol("").setOutputCol("").setSplits(splitV);

    再次,需要将categorical 类型的特征转换为numerical类型。主要包括两个步骤,缺失值处理和编码转换。
    缺失值处理方面,可以使用全局的NA来统一标记缺失值:
            /*将categoricalb类型的变量的缺失值使用NA值填充*/
            String[] strCols = {"CatFeature1","CatFeature2"};
            dfTrain = dfTrain.na().fill("NA",strCols);
            dfTest = dfTest.na().fill("NA",strCols);

    缺失值处理完成之后,就可以正式的对categorical类型的特征进行numerical转换了。在spark ml中,可以借助StringIndexeroneHotEncoder完成
    这一任务:
            // StringIndexer  oneHotEncoder 将 categorical变量转换为 numerical 变量
            // 如某列特征为星期几、天气等等特征,则转换为七个0-1特征
            StringIndexer cat1Index = new StringIndexer().setInputCol("CatFeature1").setOutputCol("indexedCat1").setHandleInvalid("skip");
            OneHotEncoder cat1Encoder = new OneHotEncoder().setInputCol(cat1Index.getOutputCol()).setOutputCol("CatVector1");
            StringIndexer cat2Index = new StringIndexer().setInputCol("CatFeature2").setOutputCol("indexedCat2");
            OneHotEncoder cat2Encoder = new OneHotEncoder().setInputCol(cat2Index.getOutputCol()).setOutputCol("CatVector2");

    至此,特征预处理步骤基本完成了。由于上述特征都是处于单独的列并且列名独立,为方便后续模型进行特征输入,需要将其转换为特征向量,并统一命名,
    可以使用VectorAssembler类完成这一任务:
            /*转换为特征向量*/
            String[] vectorAsCols = {"intFeature1","intFeature2","CatVector1","CatVector2"};
            VectorAssembler vectorAssembler = new VectorAssembler().setInputCols(vectorAsCols).setOutputCol("vectorFeature");
    通常,预处理之后获得的特征有成千上万维,出于去除冗余特征、消除维数灾难、提高模型质量的考虑,需要进行选择。在此,使用卡方检验方法,
    利用特征与类标签之间的相关性,进行特征选取:
            /*特征较多时,使用卡方检验进行特征选择,主要是考察特征与类标签的相关性*/
            ChiSqSelector chiSqSelector = new ChiSqSelector().setFeaturesCol("vectorFeature").setLabelCol("label").setNumTopFeatures(10)
                    .setOutputCol("selectedFeature");

    在特征预处理和特征选取完成之后,就可以定义模型及其参数了。简单期间,在此使用LogisticRegression模型,并设定最大迭代次数、正则化项:
            /* 设置最大迭代次数和正则化参数 setElasticNetParam=0.0 为L2正则化 setElasticNetParam=1.0为L1正则化*/
            /*设置特征向量的列名,标签的列名*/
            LogisticRegression logModel = new LogisticRegression().setMaxIter(100).setRegParam(0.1).setElasticNetParam(0.0)
                    .setFeaturesCol("selectedFeature").setLabelCol("lable");

    在上述准备步骤完成之后,就可以开始定义pipeline并进行模型的学习了:
            /*将特征转换,特征聚合,模型等组成一个管道,并调用它的fit方法拟合出模型*/
            PipelineStage[] pipelineStage = {cat1Index,cat2Index,cat1Encoder,cat2Encoder,vectorAssembler,logModel};
            Pipeline pipline = new Pipeline().setStages(pipelineStage);
            PipelineModel pModle = pipline.fit(dfTrain);
    上面pipelinefit方法得到的是一个Transformer,我们可以使它作用于训练集得到模型在训练集上的预测结果:
            //拟合得到模型的transform方法进行预测
            DataFrame output = pModle.transform(dfTest).select("selectedFeature", "label", "prediction", "rawPrediction", "probability");
            DataFrame prediction = output.select("label", "prediction");
            prediction.show();

    分析计算,得到模型在训练集上的准确率,看看模型的效果怎么样:
            /*测试集合上的准确率*/
            long correct = prediction.filter(prediction.col("label").equalTo(prediction.col("'prediction"))).count();
            long total = prediction.count();
            double accuracy = correct / (double)total;
    
            System.out.println(accuracy);

    最后,可以将模型保存下来,下次直接使用就可以了:
            String pModlePath = ""file:\D:\dac_sample\";
            pModle.save(pModlePath);

    三,梳理和总结:

    上述,借助代码实现了基于spark ml pipeline的机器学习,包括数据转换、特征生成、特征选取、模型定义及模型学习等多个stage,得到的pipeline
    模型后,就可以在新的数据集上进行预测,总结为两部分并用流程图表示如下:
    训练阶段:

    预测阶段:


    借助于Pepeline,在spark上进行机器学习的数据流向更加清晰,同时每一stage的任务也更加明了,因此,无论是在模型的预测使用上、还是
    模型后续的改进优化上,都变得更加容易。




  • 相关阅读:
    java基础部分的一些有意思的东西。
    antdvue按需加载插件babelpluginimport报错
    阿超的烦恼 javaScript篇
    .NET E F(Entity Framework)框架 DataBase First 和 Code First 简单用法。
    JQuery获得input ID相同但是type不同的方法
    gridview的删除,修改,数据绑定处理
    jgGrid数据格式
    Cannot read configuration file due to insufficient permissions
    Invoke action which type of result is JsonResult on controller from view using Ajax or geJSon
    Entity model数据库连接
  • 原文地址:https://www.cnblogs.com/cl1024cl/p/6205036.html
Copyright © 2011-2022 走看看