zoukankan      html  css  js  c++  java
  • Spark机器学习基础-监督学习

    监督学习

    0.线性回归(加L1、L2正则化)

    from __future__ import print_function
    from pyspark.ml.regression import LinearRegression
    from pyspark.sql import SparkSession
    
    spark = SparkSession
        .builder
        .appName("LinearRegressionWithElasticNet")
        .getOrCreate()
    
    # 加载数据
    training = spark.read.format("libsvm")
        .load("data/mllib/sample_linear_regression_data.txt")
    
    lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
    
    # 拟合模型
    lrModel = lr.fit(training)
    
    # 输出系数和截距
    print("Coefficients: %s" % str(lrModel.coefficients))
    print("Intercept: %s" % str(lrModel.intercept))
    
    # 模型信息总结输出
    trainingSummary = lrModel.summary
    print("numIterations: %d" % trainingSummary.totalIterations)
    print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
    trainingSummary.residuals.show()
    print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
    print("r2: %f" % trainingSummary.r2)
    
    spark.stop()
    

     结果:

    Coefficients: [0.0,0.322925166774,-0.343854803456,1.91560170235,0.0528805868039,0.76596272046,0.0,-0.151053926692,-0.215879303609,0.220253691888]
    Intercept: 0.159893684424
    numIterations: 7
    objectiveHistory: [0.49999999999999994, 0.4967620357443381, 0.4936361664340463, 0.4936351537897608, 0.4936351214177871, 0.49363512062528014, 0.4936351206216114]
    +--------------------+
    |           residuals|
    +--------------------+
    |  -9.889232683103197|
    |  0.5533794340053554|
    |  -5.204019455758823|
    | -20.566686715507508|
    |    -9.4497405180564|
    |  -6.909112502719486|
    |  -10.00431602969873|
    |   2.062397807050484|
    |  3.1117508432954772|
    | -15.893608229419382|
    |  -5.036284254673026|
    |   6.483215876994333|
    |  12.429497299109002|
    |  -20.32003219007654|
    | -2.0049838218725005|
    | -17.867901734183793|
    |   7.646455887420495|
    | -2.2653482182417406|
    |-0.10308920436195645|
    |  -1.380034070385301|
    +--------------------+
    only showing top 20 rows
    
    RMSE: 10.189077
    r2: 0.022861

      

    1.广义线性模型

    from __future__ import print_function
    from pyspark.sql import SparkSession
    from pyspark.ml.regression import GeneralizedLinearRegression
    
    
    spark = SparkSession
        .builder
        .appName("GeneralizedLinearRegressionExample")
        .getOrCreate()
    
    # 加载数据
    dataset = spark.read.format("libsvm")
        .load("data/mllib/sample_linear_regression_data.txt")
    
    glr = GeneralizedLinearRegression(family="gaussian", link="identity", maxIter=10, regParam=0.3)
    
    # 拟合模型
    model = glr.fit(dataset)
    
    # 输出系数和截距
    print("Coefficients: " + str(model.coefficients))
    print("Intercept: " + str(model.intercept))
    
    # 模型信息总结与输出
    summary = model.summary
    print("Coefficient Standard Errors: " + str(summary.coefficientStandardErrors))
    print("T Values: " + str(summary.tValues))
    print("P Values: " + str(summary.pValues))
    print("Dispersion: " + str(summary.dispersion))
    print("Null Deviance: " + str(summary.nullDeviance))
    print("Residual Degree Of Freedom Null: " + str(summary.residualDegreeOfFreedomNull))
    print("Deviance: " + str(summary.deviance))
    print("Residual Degree Of Freedom: " + str(summary.residualDegreeOfFreedom))
    print("AIC: " + str(summary.aic))
    print("Deviance Residuals: ")
    summary.residuals().show()
    
    spark.stop()
    

      结果:

    Coefficients: [0.0105418280813,0.800325310056,-0.784516554142,2.36798871714,0.501000208986,1.12223511598,-0.292682439862,-0.498371743232,-0.603579718068,0.672555006719]
    Intercept: 0.145921761452
    Coefficient Standard Errors: [0.7950428434287478, 0.8049713176546897, 0.7975916824772489, 0.8312649247659919, 0.7945436200517938, 0.8118992572197593, 0.7919506385542777, 0.7973378214726764, 0.8300714999626418, 0.7771333489686802, 0.463930109648428]
    T Values: [0.013259446542269243, 0.9942283563442594, -0.9836067393599172, 2.848657084633759, 0.6305509179635714, 1.382234441029355, -0.3695715687490668, -0.6250446546128238, -0.7271418403049983, 0.8654306337661122, 0.31453393176593286]
    P Values: [0.989426199114056, 0.32060241580811044, 0.3257943227369877, 0.004575078538306521, 0.5286281628105467, 0.16752945248679119, 0.7118614002322872, 0.5322327097421431, 0.467486325282384, 0.3872259825794293, 0.753249430501097]
    Dispersion: 105.609883568
    Null Deviance: 53229.3654339
    Residual Degree Of Freedom Null: 500
    Deviance: 51748.8429484
    Residual Degree Of Freedom: 490
    AIC: 3769.18958718
    Deviance Residuals: 
    +-------------------+
    |  devianceResiduals|
    +-------------------+
    |-10.974359174246889|
    | 0.8872320138420559|
    | -4.596541837478908|
    |-20.411667435019638|
    |-10.270419345342642|
    |-6.0156058956799905|
    |-10.663939415849267|
    | 2.1153960525024713|
    | 3.9807132379137675|
    |-17.225218272069533|
    | -4.611647633532147|
    | 6.4176669407698546|
    | 11.407137945300537|
    | -20.70176540467664|
    | -2.683748540510967|
    |-16.755494794232536|
    |  8.154668342638725|
    |-1.4355057987358848|
    |-0.6435058688185704|
    |  -1.13802589316832|
    +-------------------+
    only showing top 20 rows

    2.逻辑回归

    from __future__ import print_function
    from pyspark.ml.classification import LogisticRegression
    from pyspark.sql import SparkSession
    
    spark = SparkSession 
        .builder 
        .appName("LogisticRegressionSummary") 
        .getOrCreate()
    
    # 加载数据
    training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
    
    lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
    
    # 拟合模型
    lrModel = lr.fit(training)
    
    # 模型信息总结与输出
    trainingSummary = lrModel.summary
    
    # 输出每一轮的损失函数值
    objectiveHistory = trainingSummary.objectiveHistory
    print("objectiveHistory:")
    for objective in objectiveHistory:
        print(objective)
    
    # ROC曲线
    trainingSummary.roc.show()
    print("areaUnderROC: " + str(trainingSummary.areaUnderROC))
    
    # Set the model threshold to maximize F-Measure
    #fMeasure = trainingSummary.fMeasureByThreshold
    #maxFMeasure = fMeasure.groupBy(['threshold']).max('F-Measure').select('max(F-Measure)')
    #bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure.select('max(F-Measure)')['max(F-Measure)']).select('threshold')['threshold']
    #lr.setThreshold(bestThreshold)
    
    spark.stop()
    

      结果:

    objectiveHistory:
    0.683314913574
    0.666287575147
    0.621706854603
    0.612726524589
    0.60603479868
    0.603175068757
    0.596962153484
    0.594074303198
    0.590608924334
    0.589472457649
    0.588218777573
    +---+--------------------+
    |FPR|                 TPR|
    +---+--------------------+
    |0.0|                 0.0|
    |0.0|0.017543859649122806|
    |0.0| 0.03508771929824561|
    |0.0| 0.05263157894736842|
    |0.0| 0.07017543859649122|
    |0.0| 0.08771929824561403|
    |0.0| 0.10526315789473684|
    |0.0| 0.12280701754385964|
    |0.0| 0.14035087719298245|
    |0.0| 0.15789473684210525|
    |0.0| 0.17543859649122806|
    |0.0| 0.19298245614035087|
    |0.0| 0.21052631578947367|
    |0.0| 0.22807017543859648|
    |0.0| 0.24561403508771928|
    |0.0|  0.2631578947368421|
    |0.0|  0.2807017543859649|
    |0.0|  0.2982456140350877|
    |0.0|  0.3157894736842105|
    |0.0|  0.3333333333333333|
    +---+--------------------+
    only showing top 20 rows
    
    areaUnderROC: 1.0
    from __future__ import print_function
    from pyspark.ml.classification import LogisticRegression
    from pyspark.sql import SparkSession
    
    spark = SparkSession
        .builder
        .appName("LogisticRegressionWithElasticNet")
        .getOrCreate()
    
    # 加载数据
    training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
    
    lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
    
    # 拟合模型
    lrModel = lr.fit(training)
    
    # 系数与截距
    print("Coefficients: " + str(lrModel.coefficients))
    print("Intercept: " + str(lrModel.intercept))
    
    # 多项式逻辑回归
    mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")
    
    # 拟合模型
    mlrModel = mlr.fit(training)
    
    # 输出系数
    print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix))
    print("Multinomial intercepts: " + str(mlrModel.interceptVector))
    
    spark.stop()
    

      结果:

    Coefficients: (692,[244,263,272,300,301,328,350,351,378,379,405,406,407,428,433,434,455,456,461,462,483,484,489,490,496,511,512,517,539,540,568],[-7.35398352419e-05,-9.10273850559e-05,-0.000194674305469,-0.000203006424735,-3.14761833149e-05,-6.84297760266e-05,1.58836268982e-05,1.40234970914e-05,0.00035432047525,0.000114432728982,0.000100167123837,0.00060141093038,0.000284024817912,-0.000115410847365,0.000385996886313,0.000635019557424,-0.000115064123846,-0.00015271865865,0.000280493380899,0.000607011747119,-0.000200845966325,-0.000142107557929,0.000273901034116,0.00027730456245,-9.83802702727e-05,-0.000380852244352,-0.000253151980086,0.000277477147708,-0.000244361976392,-0.00153947446876,-0.000230733284113])
    Intercept: 0.224563159613
    Multinomial coefficients: DenseMatrix([[ 0.,  0.,  0., ...,  0.,  0.,  0.],
                 [ 0.,  0.,  0., ...,  0.,  0.,  0.]])
    Multinomial intercepts: [-0.120658794459,0.120658794459]
    
    

    3.多分类逻辑回归

    from __future__ import print_function
    from pyspark.ml.classification import LogisticRegression
    from pyspark.sql import SparkSession
    
    spark = SparkSession 
        .builder 
        .appName("MulticlassLogisticRegressionWithElasticNet") 
        .getOrCreate()
    
    # 加载数据
    training = spark 
        .read 
        .format("libsvm") 
        .load("data/mllib/sample_multiclass_classification_data.txt")
    
    lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
    
    # 拟合模型
    lrModel = lr.fit(training)
    
    # 输出系数
    print("Coefficients: 
    " + str(lrModel.coefficientMatrix))
    print("Intercept: " + str(lrModel.interceptVector))
    
    # 预测结果
    lrModel.transform(training).show()
    
    spark.stop()
    

      结果:

    Coefficients: 
    DenseMatrix([[ 0.        ,  0.        ,  0.        ,  0.31764832],
                 [ 0.        ,  0.        , -0.78039435, -0.37696114],
                 [ 0.        ,  0.        ,  0.        ,  0.        ]])
    Intercept: [0.0516523165983,-0.123912249909,0.0722599333102]
    +-----+--------------------+--------------------+--------------------+----------+
    |label|            features|       rawPrediction|         probability|prediction|
    +-----+--------------------+--------------------+--------------------+----------+
    |  1.0|(4,[0,1,2,3],[-0....|[-0.2130545101220...|[0.19824091021950...|       1.0|
    |  1.0|(4,[0,1,2,3],[-0....|[-0.2395254151479...|[0.18250386256254...|       1.0|
    |  1.0|(4,[0,1,2,3],[-0....|[-0.2130545101220...|[0.18980556250236...|       1.0|
    |  1.0|(4,[0,1,2,3],[-0....|[-0.2395254151479...|[0.19632523546632...|       1.0|
    |  0.0|(4,[0,1,2,3],[0.1...|[0.21047647616023...|[0.43750398183438...|       0.0|
    |  1.0|(4,[0,2,3],[-0.83...|[-0.2395254151479...|[0.18250386256254...|       1.0|
    |  2.0|(4,[0,1,2,3],[-1....|[0.07812299927036...|[0.37581775428218...|       0.0|
    |  2.0|(4,[0,1,2,3],[-1....|[0.05165230377890...|[0.35102739153795...|       2.0|
    |  1.0|(4,[0,1,2,3],[-0....|[-0.2659960025254...|[0.17808226409449...|       1.0|
    |  0.0|(4,[0,2,3],[0.611...|[0.18400588878268...|[0.44258017540583...|       0.0|
    |  0.0|(4,[0,1,2,3],[0.2...|[0.23694706353777...|[0.44442301486604...|       0.0|
    |  1.0|(4,[0,1,2,3],[-0....|[-0.2659960025254...|[0.17539206930356...|       1.0|
    |  1.0|(4,[0,1,2,3],[-0....|[-0.2395254151479...|[0.18250386256254...|       1.0|
    |  2.0|(4,[0,1,2,3],[-0....|[0.05165230377890...|[0.35371124645092...|       2.0|
    |  2.0|(4,[0,1,2,3],[-0....|[-0.0277597631826...|[0.32360705108265...|       2.0|
    |  2.0|(4,[0,1,2,3],[-0....|[0.02518163392628...|[0.33909561029444...|       2.0|
    |  1.0|(4,[0,2,3],[-0.94...|[-0.2395254151479...|[0.17976563656243...|       1.0|
    |  2.0|(4,[0,1,2,3],[-0....|[-0.0012891758050...|[0.32994371314262...|       2.0|
    |  0.0|(4,[0,1,2,3],[0.1...|[0.10459380900173...|[0.39691355784123...|       0.0|
    |  2.0|(4,[0,1,2,3],[-0....|[0.02518163392628...|[0.34718685710751...|       2.0|
    +-----+--------------------+--------------------+--------------------+----------+
    only showing top 20 rows


    4.多层感知器(MLP)

    from __future__ import print_function
    from pyspark.ml.classification import MultilayerPerceptronClassifier
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    from pyspark.sql import SparkSession
    
    spark = SparkSession
        .builder.appName("multilayer_perceptron_classification_example").getOrCreate()
    
    # 加载数据
    data = spark.read.format("libsvm")
        .load("data/mllib/sample_multiclass_classification_data.txt")
    
    # 切分训练集和测试集
    splits = data.randomSplit([0.6, 0.4], 1234)
    train = splits[0]
    test = splits[1]
    
    # 输入、隐层、隐层、输出个数
    layers = [4, 5, 4, 3]
    
    # 创建多层感知器
    trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)
    
    # 训练模型
    model = trainer.fit(train)
    
    # 预测和计算准确度
    result = model.transform(test)
    result.show()
    predictionAndLabels = result.select("prediction", "label")
    evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
    print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))
    
    spark.stop()
    

      结果:

    +-----+--------------------+----------+
    |label|            features|prediction|
    +-----+--------------------+----------+
    |  0.0|(4,[0,1,2,3],[-0....|       2.0|
    |  0.0|(4,[0,1,2,3],[-0....|       0.0|
    |  0.0|(4,[0,1,2,3],[-0....|       0.0|
    |  0.0|(4,[0,1,2,3],[-0....|       2.0|
    |  0.0|(4,[0,1,2,3],[-0....|       2.0|
    |  0.0|(4,[0,1,2,3],[-1....|       2.0|
    |  0.0|(4,[0,1,2,3],[0.1...|       0.0|
    |  0.0|(4,[0,1,2,3],[0.2...|       0.0|
    |  0.0|(4,[0,1,2,3],[0.3...|       0.0|
    |  0.0|(4,[0,1,2,3],[0.3...|       0.0|
    |  0.0|(4,[0,1,2,3],[0.3...|       0.0|
    |  0.0|(4,[0,1,2,3],[0.4...|       0.0|
    |  0.0|(4,[0,1,2,3],[0.5...|       0.0|
    |  0.0|(4,[0,1,2,3],[0.7...|       0.0|
    |  0.0|(4,[0,1,2,3],[0.8...|       0.0|
    |  0.0|(4,[0,1,2,3],[1.0...|       0.0|
    |  0.0|(4,[0,2,3],[0.166...|       0.0|
    |  0.0|(4,[0,2,3],[0.388...|       0.0|
    |  1.0|(4,[0,1,2,3],[-0....|       1.0|
    |  1.0|(4,[0,1,2,3],[-0....|       1.0|
    +-----+--------------------+----------+
    only showing top 20 rows
    
    Test set accuracy = 0.901960784314


    5.决策树分类

    from __future__ import print_function
    from pyspark.ml import Pipeline
    from pyspark.ml.classification import DecisionTreeClassifier
    from pyspark.ml.feature import StringIndexer, VectorIndexer
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    from pyspark.sql import SparkSession
    
    spark = SparkSession
        .builder
        .appName("DecisionTreeClassificationExample")
        .getOrCreate()
    
    # 加载数据
    data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
    
    # Index labels, adding metadata to the label column.
    # Fit on whole dataset to include all labels in index.
    labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
    # Automatically identify categorical features, and index them.
    # We specify maxCategories so features with > 4 distinct values are treated as continuous.
    featureIndexer =
        VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
    
    # Split the data into training and test sets (30% held out for testing)
    (trainingData, testData) = data.randomSplit([0.7, 0.3])
    
    # Train a DecisionTree model.
    dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
    
    # Chain indexers and tree in a Pipeline
    pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])
    
    # Train model.  This also runs the indexers.
    model = pipeline.fit(trainingData)
    
    # Make predictions.
    predictions = model.transform(testData)
    
    # Select example rows to display.
    predictions.select("prediction", "indexedLabel", "features").show(5)
    
    # Select (prediction, true label) and compute test error
    evaluator = MulticlassClassificationEvaluator(
        labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
    print("Test Error = %g " % (1.0 - accuracy))
    
    treeModel = model.stages[2]
    # summary only
    print(treeModel)
    
    spark.stop()
    

      结果:

    +----------+------------+--------------------+
    |prediction|indexedLabel|            features|
    +----------+------------+--------------------+
    |       1.0|         1.0|(692,[98,99,100,1...|
    |       1.0|         1.0|(692,[100,101,102...|
    |       1.0|         1.0|(692,[123,124,125...|
    |       1.0|         1.0|(692,[124,125,126...|
    |       1.0|         1.0|(692,[125,126,127...|
    +----------+------------+--------------------+
    only showing top 5 rows
    
    Test Error = 0.0333333 
    DecisionTreeClassificationModel (uid=DecisionTreeClassifier_4bf3a2017c8143b08d57) of depth 1 with 3 nodes
  • 相关阅读:
    pdflatex, xelatex, texstudio中文编码问题
    secure erase 时必须umount
    浪潮不能进bios解决过程
    ycsb-命令及参数-与生成的负载类型相关
    zt-Simple source policy routing
    oracle 分页查找数据
    前台调用后台的过程
    初识 java script
    java中的BigDecimal和String的相互转换
    一级缓存、二级缓存、延迟加载、hibernate session 域 pojo三种状态
  • 原文地址:https://www.cnblogs.com/tianqizhi/p/11580462.html
Copyright © 2011-2022 走看看