zoukankan      html  css  js  c++  java
  • Python+Spark2.0+hadoop学习笔记——Python Spark MLlib逻辑斯蒂回归二分类

    同上一部分的内容,在进行二分类问题时,逻辑斯蒂回归也是一种常用的分类方法。逻辑斯蒂回归使用了一个Sigmoid函数来作为核心的内容来实现分类的思想,接下里介绍在Pyspark中使用MLlib来实现逻辑斯蒂回归。

    第一步:导入需要的库函数

    import sys
    from time import time
    import pandas as pd
    import matplotlib.pyplot as plt
    from pyspark import SparkConf, SparkContext
    from pyspark.mllib.classification import LogisticRegressionWithSGD
    from pyspark.mllib.regression import LabeledPoint
    import numpy as np
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.mllib.feature import StandardScaler

    第二步:进行数据准备(和之前树模型不同的是,在使用逻辑斯蒂回归的时候因为输入数据的数值跨度特别大,因此需要进行数据标准化操作,使用函数StandardScaler()来实现)

    def get_mapping(rdd, idx):
    return rdd.map(lambda fields: fields[idx]).distinct().zipWithIndex().collectAsMap()

    def extract_label(record):
    label=(record[-1])
    return float(label)

    def extract_features(field,categoriesMap,featureEnd):
    categoryIdx = categoriesMap[field[3]]
    categoryFeatures = np.zeros(len(categoriesMap))
    categoryFeatures[categoryIdx] = 1
    numericalFeatures=[convert_float(field) for field in field[4: featureEnd]]
    return np.concatenate(( categoryFeatures, numericalFeatures))

    def convert_float(x):
    return (0 if x=="?" else float(x))

    def PrepareData(sc):
    print("Data loading...")
    rawDataWithHeader = sc.textFile(Path+"data/train.tsv")
    header = rawDataWithHeader.first()
    rawData = rawDataWithHeader.filter(lambda x:x !=header)
    rData=rawData.map(lambda x: x.replace(""", ""))
    lines = rData.map(lambda x: x.split(" "))
    print("The number of data:" + str(lines.count()))
    print("Before normalization:")
    categoriesMap = lines.map(lambda fields: fields[3]).
    distinct().zipWithIndex().collectAsMap()
    labelRDD = lines.map(lambda r: extract_label(r))
    featureRDD = lines.map(lambda r: extract_features(r,categoriesMap,len(r) - 1))
    for i in featureRDD.first():
    print (str(i)+","),
    print("")
    print("After normalization:")
    stdScaler = StandardScaler(withMean=True, withStd=True).fit(featureRDD)
    ScalerFeatureRDD=stdScaler.transform(featureRDD)
    for i in ScalerFeatureRDD.first():
    print (str(i)+","),
    labelpoint=labelRDD.zip(ScalerFeatureRDD)
    labelpointRDD=labelpoint.map(lambda r: LabeledPoint(r[0], r[1]))
    (trainData, validationData, testData) = labelpointRDD.randomSplit([8, 1, 1])
    print("trainData:" + str(trainData.count()) +
    "validationData:" + str(validationData.count()) +
    "testData:" + str(testData.count()))
    return (trainData, validationData, testData, categoriesMap)

    第三步:对模型进行训练

    def PredictData(sc,model,categoriesMap):
    print("Data loading...")
    rawDataWithHeader = sc.textFile(Path+"data/test.tsv")
    header = rawDataWithHeader.first()
    rawData = rawDataWithHeader.filter(lambda x:x !=header)
    rData=rawData.map(lambda x: x.replace(""", ""))
    lines = rData.map(lambda x: x.split(" "))
    print("The number of data" + str(lines.count()))
    dataRDD = lines.map(lambda r: ( r[0] ,
    extract_features(r,categoriesMap,len(r) )))
    DescDict = {
    0: "ephemeral",
    1: "evergreen"
    }
    for data in dataRDD.take(10):
    predictResult = model.predict(data[1])
    print( "Web" +str(data[0])+" " +
    " Prediction:"+ str(predictResult)+
    " Inllustration:"+DescDict[predictResult] +" ")

    第四步:对模型进行评估(需要调节的参数有numIterations、stepSize和miniBatchFraction)

    def evaluateModel(model, validationData):
    score = model.predict(validationData.map(lambda p: p.features))
    score = score.map(lambda score : float(score))
    Labels = validationData.map(lambda p: p.label)
    Labels = Labels.map(lambda Labels : float(Labels))
    scoreAndLabels=score.zip(Labels)
    metrics = BinaryClassificationMetrics(scoreAndLabels)
    AUC=metrics.areaUnderROC
    return(AUC)

    def trainEvaluateModel(trainData,validationData,
    numIterations, stepSize, miniBatchFraction):
    startTime = time()
    model = LogisticRegressionWithSGD.train(trainData,numIterations, stepSize, miniBatchFraction)
    AUC = evaluateModel(model, validationData)
    duration = time() - startTime
    print ( " numIterations="+str(numIterations) +
    " stepSize="+str(stepSize) +
    " miniBatchFraction="+str(miniBatchFraction) +
    " Time="+str(duration) +
    " AUC = " + str(AUC) )
    return (AUC,duration, numIterations, stepSize, miniBatchFraction,model)

    def evalParameter(trainData, validationData, evalparm,
    numIterationsList, stepSizeList, miniBatchFractionList):
    metrics = [trainEvaluateModel(trainData, validationData,
    numIterations,stepSize,miniBatchFraction )
    for numIterations in numIterationsList
    for stepSize in stepSizeList
    for miniBatchFraction in miniBatchFractionList ]
    if evalparm=="numIterations":
    IndexList=numIterationsList[:]
    elif evalparm=="stepSize":
    IndexList=stepSizeList[:]
    elif evalparm=="miniBatchFraction":
    IndexList=miniBatchFractionList[:]
    df = pd.DataFrame(metrics,index=IndexList,
    columns=['AUC', 'duration','numIterations', 'stepSize', 'miniBatchFraction','model'])
    showchart(df,evalparm,'AUC','duration',0.5,0.7 )

    def showchart(df,evalparm ,barData,lineData,yMin,yMax):
    ax = df[barData].plot(kind='bar', title =evalparm,figsize=(10,6),legend=True, fontsize=12)
    ax.set_xlabel(evalparm,fontsize=12)
    ax.set_ylim([yMin,yMax])
    ax.set_ylabel(barData,fontsize=12)
    ax2 = ax.twinx()
    ax2.plot(df[[lineData ]].values, linestyle='-', marker='o', linewidth=2.0,color='r')
    plt.show()

    def evalAllParameter(trainData, validationData,
    numIterationsList, stepSizeList, miniBatchFractionList):
    metrics = [trainEvaluateModel(trainData, validationData,
    numIterations,stepSize, miniBatchFraction )
    for numIterations in numIterationsList
    for stepSize in stepSizeList
    for miniBatchFraction in miniBatchFractionList ]
    Smetrics = sorted(metrics, key=lambda k: k[0], reverse=True)
    bestParameter=Smetrics[0]
    print("Best parameter:numIterations:" + str(bestParameter[2]) +
    " ,stepSize:" + str(bestParameter[3]) +
    " ,miniBatchFraction:" + str(bestParameter[4]) +
    " ,AUC = " + str(bestParameter[0]))
    return bestParameter[5]

    def parametersEval(trainData, validationData):
    print("numIterations")
    evalParameter(trainData, validationData,"numIterations",
    numIterationsList=[5, 15, 20, 60, 100],
    stepSizeList=[10],
    miniBatchFractionList=[1 ])
    print("stepSize")
    evalParameter(trainData, validationData,"stepSize",
    numIterationsList=[100],
    stepSizeList=[10, 50, 100, 200],
    miniBatchFractionList=[1])
    print("miniBatchFraction")
    evalParameter(trainData, validationData,"miniBatchFraction",
    numIterationsList=[100],
    stepSizeList =[100],
    miniBatchFractionList=[0.5, 0.8, 1 ])

    第五步:Spark相关设置

    def SetLogger( sc ):
    logger = sc._jvm.org.apache.log4j
    logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
    logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
    logger.LogManager.getRootLogger().setLevel(logger.Level.ERROR)

    def SetPath(sc):
    global Path
    if sc.master[0:5]=="local" :
    Path="file:/home/jorlinlee/pythonsparkexample/PythonProject/"
    else:
    Path="hdfs://master:9000/user/jorlinlee/"

    def CreateSparkContext():
    conf = SparkConf().setMaster("local[*]").setAppName("LR")
    sc = SparkContext(conf = conf)
    print ("master="+sc.master)
    SetLogger(sc)
    SetPath(sc)
    return (sc)


    sc.stop()

    第六步:运行主程序

    if __name__ == "__main__":
    print("LR")
    sc=CreateSparkContext()
    print("Data preparing")
    (trainData, validationData, testData, categoriesMap) =PrepareData(sc)
    trainData.persist(); validationData.persist(); testData.persist()
    print("Evaluating")
    (AUC,duration, numIterationsParm, stepSizeParm, miniBatchFractionParm,model)=trainEvaluateModel(trainData, validationData, 15, 10, 0.5)
    if (len(sys.argv) == 2) and (sys.argv[1]=="-e"):
    parametersEval(trainData, validationData)
    elif (len(sys.argv) == 2) and (sys.argv[1]=="-a"):
    print("Best parameters")
    model=evalAllParameter(trainData, validationData,
    [3, 5, 10,15],
    [10, 50, 100],
    [0.5, 0.8, 1 ])
    print("test")
    auc = evaluateModel(model, testData)
    print("Test AUC:" + str(auc))
    print("Predict")
    PredictData(sc, model, categoriesMap)

    结果:

    Predict
    Data loading...
    The number of data3171
    Webhttp://www.lynnskitchenadventures.com/2009/04/homemade-enchilada-sauce.html
    Prediction:0 Inllustration:ephemeral

    Webhttp://lolpics.se/18552-stun-grenade-ar
    Prediction:0 Inllustration:ephemeral

    Webhttp://www.xcelerationfitness.com/treadmills.html
    Prediction:0 Inllustration:ephemeral

    Webhttp://www.bloomberg.com/news/2012-02-06/syria-s-assad-deploys-tactics-of-father-to-crush-revolt-threatening-reign.html
    Prediction:0 Inllustration:ephemeral

    Webhttp://www.wired.com/gadgetlab/2011/12/stem-turns-lemons-and-limes-into-juicy-atomizers/
    Prediction:0 Inllustration:ephemeral

    Webhttp://www.latimes.com/health/boostershots/la-heb-fat-tax-denmark-20111013,0,2603132.story
    Prediction:0 Inllustration:ephemeral

    Webhttp://www.howlifeworks.com/a/a?AG_ID=1186&cid=7340ci
    Prediction:0 Inllustration:ephemeral

    Webhttp://romancingthestoveblog.wordpress.com/2010/01/13/sweet-potato-ravioli-with-lemon-sage-brown-butter-sauce/
    Prediction:0 Inllustration:ephemeral

    Webhttp://www.funniez.net/Funny-Pictures/turn-men-down.html
    Prediction:0 Inllustration:ephemeral

    Webhttp://youfellasleepwatchingadvd.com/
    Prediction:0 Inllustration:ephemeral

  • 相关阅读:
    python hmac解密
    pymongo加索引以及查看索引例子
    语音-数字中继-E1-学习帖
    Partition does not start on physical sector boundary
    吃自助餐怎么样一个顺序吃法才合算?
    关于ROS证书导入的步骤
    MikroTik-ROS-无线设备传输距离
    这个移动通讯中 DB 、DBm 、瓦的基本知识的问题:
    涨姿势 | 无线通讯距离现场评估知多少?
    linux下生成https的crt和key证书
  • 原文地址:https://www.cnblogs.com/zhuozige/p/12626929.html
Copyright © 2011-2022 走看看