zoukankan      html  css  js  c++  java
  • Spark词频统计,求TOP值,文件排序,二次排序

    RDD操作

    词频统计

    line = sc.textFile("file:///usr/local/spark/word.txt")
    wordCount = lines.flatMap(lambda line: line.split(" ").map(lambda word: (word,1)).reduceByKey(lambda a,b: a + b)
    print(wordCount.collect())
    //[('good',1),('Spark',2),('hadoop',1)]
    //.collect()以数组形式将结果集缓存磁盘
    

    Pair RDD操作

    计算每种图书的每天平均销量

    rdd = sc.parallelize([("spark",2),("hadoop",6),("hadoop",4),("spark",6)])
    rdd.mapValues(lambda x : (x,1)).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])).mapValues(lambda x:x[0]/x[1]).collect()
    //输出结果[('hadoop',5.0),('spark',4.0)]
    

    求TOP值

    file文件有4列,求第三列前N个TOP值。

    from pyspark import SparkConf,SparkContext
    
    conf = SparkConf().setMaster("local").setAppName("TOP")
    sc = SparkContext(conf = conf)
    lines = sc.textFile("file:///usr/local/spark/mycode/rdd/file")
    //strip()消去空格,result1把数据没有4项的过滤掉
    result1 = lines.filter(lambda line:(len(line.strip()) > 0) and (len(line.split(","))==4))
    //result2取第三列的数据
    result2 = result1.map(lambda x:x.split(",")[2])
    //result3转化为键值对(x,""),转化成键值对才能排序
    result3 = result2.map(lambda x:(int(x),""))
    //result4分成1个分区,否则是各自分区排序
    result4 = result3.repartition(1)
    //result5对key排序
    result5 = result4.sortByKey(False)
    //result6取键值对的key
    result6 = result5.map(lambda x:x[0])
    //result7取前5个
    result7 = result6.take(5)
    for a in result7:
       print(a)
    

    文件排序

    读取文件中所有整数进行排序

    from pyspark import SparkConf,SparkContext
    
    index = 0
    def getindex():
      global index
      index+=1
      return index
      
    def main():
      conf = SparkConf().setMaster("local[1]").setAppName("FileSort")
      sc = SparkContext(conf = conf)
      lines = sc.textFile("file:///usr/local/spark/mycode/rdd/filesor/file*.txt")
      index = 0
      //过滤空行
      result1 = lines.filter(lambda line:(len(line.strip()) > 0))
      result2 = result1.map(lambda x:(int(x.strip()),""))
      result3 = result2.repartition(1)
      result4 = result3.sortByKey(True)
      result5 = result4.map(lambda x:x[0])
      result6 = result5.map(lambda x:(getindex(),x))
      result6.foreach(print)
      result6.saveAsTextFile("file:///usr/local/spark/mycode/rdd/filesort/sortresult")
    if__name__=='__main__':
      main()
    

    二次排序

    对于一个给定的文件(有两列整数),请对数据进行排序,首先根据第一列数据降序排序,如果第一列数据相等,则根据第二列数据降序排序

    • 按照Ordered和Serializable接口实现自定义排序的key
    • 将要进行二次排序的文件加载进来生成<key,value>类型的RDD
    • 使用sortByKey基于自定义的key进行二次排序
    • 去掉排序的key只保留排序的结果
    from pyspark import SparkConf,SparkContext
    from operator import gt
    
    class SecondarySortKey():
      def__init__(self,k):
        self.column1 = k[0]
        self.column2 = k[1]
      
      def__gt__(self, other):
        if other.column1 ==self.column1:
          return gt(self.column2,other.column2)
        else:
          return gt(self.column1,other.column1)
    
    def main():
      conf = SparkConf().setAppName('spark_sort').setMaster('local[1]')
      sc = SparkConf(conf = conf)
      file = "file:///usr/local/spark/mycode/secondarysort/file.txt"
      rdd1 = sc.textFile(file)
      rdd2 = rdd1.filter(lambda x:len(x.strip()>0))
      rdd3 = rdd2.map(lambda x:((int(x.split(" ")[0]),int(x.split(" ")[1])),x))
      rdd4 = rdd3.map(lambda x: (SecondarySortKey(x[0]),x[1]))
      rdd5 = rdd4.sortByKey(False)
      rdd6 = rdd5.map(lambda x:x[1])
      rdd6.foreach(print)
      
    if__name__=='__main__':
      main()
    
    所有案例代码教学均来自于厦门大学数据库实验室。
  • 相关阅读:
    2015年 Stoi&Gdoi 反思总结与未来计划
    bzoj4517: [Sdoi2016]排列计数--数学+拓展欧几里得
    bzoj4518: [Sdoi2016]征途--斜率DP
    BZOJ 1391: [Ceoi2008]order
    BZOJ 2527: [Poi2011]Meteors
    BZOJ 2087: [Poi2010]Sheep
    BZOJ 1283: 序列
    BZOJ 1914: [Usaco2010 OPen]Triangle Counting 数三角形
    BZOJ 3513: [MUTC2013]idiots
    BZOJ 3771: Triple
  • 原文地址:https://www.cnblogs.com/chenshaowei/p/12435335.html
Copyright © 2011-2022 走看看