zoukankan      html  css  js  c++  java
  • 用Spark做求平均成绩算法

    ##由于才开始学,此做法为只为结果,不为过程型

    实验数据:

    math.txt:      English.txt:

    Ben 98       Ben 89

    Bean 99       Bean 98

    Harry 89       Harry 78

    Sam 79       Sam 87

    Tom 80       Tom 80

     

    from pyspark import SparkContext
    #定义的函数,用于把两个文件的list合并转换成(Tome,80)形式
    def TurnDict(dict1):
    keys = []
    values =[]
    for i in range(0,len(dict1)):
    if i % 2 != 0:
    values.append(int(dict1[i]))
    else:
    keys.append(dict1[i])
    dict2 = zip(keys,values)
    return dict2

    sc = SparkContext('local','AvGrade')
    #创建RDD
    rdd1 = sc.textFile("file:///usr/local/spark/mycode/TestPackage/math.txt")
    rdd2 = sc.textFile("file:///usr/local/spark/mycode/TestPackage/English.txt")
    #对与数据流按" "切割,并且调用map函数转换成(Tom ,1)形式,在接着用keys把key值取出来,再用collect()单独为一个列表
    pairRDD1 = rdd1.flatMap(lambda line : (line.split(" ")[0],line.split(" ")[1])).map(lambda x : (x,1)).keys().collect()
    pairRDD2 = rdd2.flatMap(lambda line : (line.split(" ")[0],line.split(" ")[1])).map(lambda x : (x,1)).keys().collect()
    把字典转换成DataFrame
    pairRDD_1 = sc.parallelize(TurnDict(pairRDD1))
    pairRDD_2 = sc.parallelize(TurnDict(pairRDD2))
    '''
    pairRDD = pairRDD_1.join(pairRDD_2)
    pairRDD.reduceByKey(lambda x,y : (x+y)).foreach(print)
    '''
    #把两个RDD里面的值合并为一个列表
    pairRDD_1_N = pairRDD_1.collect()
    pairRDD_2_N = pairRDD_2.collect()
    for i in pairRDD_1_N:
    pairRDD_2_N.append(i)
     
    #再转换成RDD
    pairRDD = sc.parallelize(pairRDD_2_N)
    #调用reducByKey进行计算
    result =pairRDD.reduceByKey(lambda x,y : ((x+y)/2))
    #打印值
    result.foreach(print)
  • 相关阅读:
    滴滴日送400万红包,仅仅为人群不冷漠?
    C++提供的四种新式转换--const_cast dynamic_cast reinterpret_cast static_cast
    GreenDao开源ORM框架浅析
    Python 计数器
    Linux虚拟内存的添加
    Linux iptables
    Python set
    Python dict get items pop update
    Python contains
    Python reverse
  • 原文地址:https://www.cnblogs.com/SoftwareBuilding/p/9407197.html
Copyright © 2011-2022 走看看