zoukankan      html  css  js  c++  java
  • pyspark的排序

    一、count

    sql = """select video_id,count(video_id) as video_num from video_table group by video_id order by video_num desc"""
    rdd = spark.sql(sql).rdd.map(lambda x: x["video_id"])
    result = rdd.collect()
    

    二、sortBy和sortByKey

    from operator import add
    sql = """select video_id from video_table """
    rdd = spark.sql(sql).rdd.map(lambda x: (x["video_id"],1))..reduceByKey(add)
    rdd1 = rdd.sortBy(lambda x: x[1], ascending=False)
    rdd2 = rdd.sortByKey(lambda x: x, ascending=False)
    result = rdd1.collect() # rdd2.collect()
    

    1、sortBy如何实现全局排序

    sortBy实际上调用sortByKey

    def sortBy(self, keyfunc, ascending=True, numPartitions=None):
          return self.keyBy(keyfunc).sortByKey(ascending, numPartitions).values()
    

    2、sortBy的实现过程:

    Stage 0:Sample。创建 RangePartitioner,先对输入的数据的key做sampling来估算key的分布情况,然后按指定的排序切分出range,尽可能让每个partition对应的range里的key的数量均匀。计算出来的 rangeBounds 是一个长为 numPartitions - 1 的list,记录头 numPartitions - 1 个partition对应的range的上界;最后一个partition的边界就隐含在“剩余”当中。

     rddSize = self.count() # 统计rdd中包含的元素个数,假设rddSize=10000
     if not rddSize:
        return self  # empty RDD
    maxSampleSize = numPartitions * 20.0  # 假设有4个分区,maxSampleSize=80
    fraction = min(maxSampleSize / max(rddSize, 1), 1.0) # fraction=8/1000,
    samples = self.sample(False, fraction, 1).map(lambda kv: kv[0]).collect() # 采样 8/1000,根据采样出的数据来估算key的分布情况。
    samples = sorted(samples, key=keyfunc) # 对采样得到的rdd collect之后得到的列表,调用python的sorted方法,完成从小到大排序,得到排好序的列表。
    # 得到numPartition-1=3个边界列表。
    bounds = [samples[int(len(samples) * (i + 1) / numPartitions)]
                      for i in range(0, numPartitions - 1)]
    # partitionBy根据给定的3个边界进行分区,分区之后分区间的元素是排好序的。再调用mapPartitions,对每个分区的数据进行排序
    def rangePartitioner(k):
        p = bisect.bisect_left(bounds, keyfunc(k))
        if ascending:
            return p
        else:
            return numPartitions - 1 - p
    
    return self.partitionBy(numPartitions, rangePartitioner).mapPartitions(sortPartition, True)
    

    Stage 1:Shuffle Write。开始shuffle,在map side做shuffle write,根据前面计算出的rangeBounds来重新partition。
    通过key值和区间边界进行比较,如果位于改区间,则分配到该区间对应的分区。
    Shuffle write出的数据中,每个partition内的数据虽然尚未排序,但partition之间已经可以保证数据是按照partition index排序的了。
    Stage 2:Shuffle Read。然后到reduce side,每个reducer再对拿到的本partition内的数据做排序。这样完成之后,partition之间的数据在map side就保证有排序,而每个partition内的数据在reduce side也保证有排序,就达到了全局排序的效果。如果在 sortByKey() 后面跟一个 collect() 调用,则它会按照partition index的顺序获取结果数据,最后再把这些数组合并起来,就在本地得到了全局排序后的大数组。

    三、调用python方法

    from collections import Counter
    def category_trans(x):
        """
        统计每个分类下面视频出现的次数
        :param x:
        :return:
        """
        tag = x[0]
        videos = x[1]
        result = Counter(videos)
        r = sorted(result.items(), key=lambda item: item[1],reverse=True)
        return tag,[item[0] for item in r]
    rdd = rdd.map(lambda x:(x["tag"],[x["video_id"]])) # 此时rdd内的数据为[(tag1:[video_1]),(tag1,[video_2]),(tag2,[video_1]),...]
    video_rdd = rdd.reduceByKey(lambda x,y: x+y) # [(tag1,[v1,v2,...]),...]
    t2v = video_rdd.map(lambda x: category_trans(x))  # [(tag1,[排好序的列表]),...]
    result = t2v.collectAsMap()
    

    四、自定义类

    将rdd中元素转换为自定义类的实例

    class MySort():
        """
        自定义类的__lt__()方法。python的类中已经自带了lt,eq,ge,gt...等方法
        """
        def __init__(self,num):
            self.num = num
        def __lt__(self,other):
            return self.num<other.num
    
        def __repr__(self):
            return str(self.num)
    
    rdd = sc.parallelize([(1, 1),(1, 2), (-1, 1), (-1, -0.5)])
    rdd = rdd.map(MySort)
    rdd = rdd.sortBy(lambda x: x,ascending=False)
    rdd = rdd.foreach(lambda x: print(x))
    

    参考资料

    1、(Spark排序的原理? - RednaxelaFX的回答 - 知乎

    https://www.zhihu.com/question/34771277/answer/187001059)

  • 相关阅读:
    Redis 连接
    Redis 脚本
    Redis 事务
    Redis 发布订阅
    redis 字符串数据(string)
    Redis 键(key)
    Redis 命令
    Redis的五种数据类型
    java.lang.NoClassDefFoundError: org/apache/commons/logging/LogFactory 解决方案
    在命令行中运行eclipse中创建的java项目
  • 原文地址:https://www.cnblogs.com/leimu/p/15543606.html
Copyright © 2011-2022 走看看