zoukankan      html  css  js  c++  java
  • Spark学习笔记(13)---数据倾斜解决方案

    数据倾斜解决方案

    目录

    聚合源数据

    1. 第一种方案

    一般都是hive表中对每个key进行聚合,按照key进行分组,将key对应的所有的values,全部用一种特殊的格式,拼接到一个字符串里面去,比如“key=sessionid, value: action_seq=1|user_id=1|search_keyword=火锅|category_id=001;action_seq=2|user_id=1|search_keyword=涮肉|category_id=001”
    对key进行group,在spark中拿到的数据就是key=sessionid,values,所以在spark中就不需要再去执行groupByKey+map这种操作了。直接对每个key对应的values字符串,map操作,进行你需要的操作即可。key,values串。

    2. 第二种方案

    你可能没有办法对每个key,就聚合出来一条数据;那么也可以做一个妥协;对每个key对应的数据,10万条;有好几个粒度,比如10万条里面包含了几个城市、几天、几个地区的数据,现在放粗粒度;直接就按照城市粒度,做一下聚合,几个城市,几天、几个地区粒度的数据,都给聚合起来。

    提高shuffle操作的reduce并行度

    将reduce task的数量变多,就可以让每个reduce task分配到更少的数据量,这样的话,也许就可以缓解,或者甚至是基本解决掉数据倾斜的问题。

    方法主要是给我们所有的shuffle算子,比如groupByKey、countByKey、reduceByKey。在调用的时候,传入进去一个数字。那个数字,就代表了那个shuffle操作的reduce端的并行度。那么在进行shuffle操作的时候,就会对应着创建指定数量的reduce task。就可以让每个reduce task分配到更少的数据。基本可以缓解数据倾斜的问题。

    使用随机key实现双重聚合

    如下图所示, 我们可以把导致数据倾斜的key打上不同的随机前缀,这样就划分成多个不同的key

    groupByKey和reduceByKey使用这用会有很好的效果。

    将reduce join转为map join

    reduce join就是普通的join,是需要shuffle的。如下图所示

    如果两个RDD要进行join,其中一个RDD是比较小的。一个RDD是100万数据,一个RDD是1万数据。broadcast出去那个小RDD的数据以后,就会在每个executor的block manager中都驻留一份。
    要确保你的内存足够存放那个小RDD中的数据

    sample采样倾斜key进行两次join

    可以将数据抽样,得到样本中次数最多的key,可能就是会导致数据倾斜的key。然后就把数据分为普通key和产生倾斜的key分别join,最后在合并。如下图:

    对于join,优先采用上一小节的内容,将reduce join转为map join。如果是两个RDD数据都很大,而且有一两个key导致数据倾斜,就可以考虑采用这种方案。如果导致数据倾斜的key特别多,那么请看下一小节。

    这种方式还有一种更优的操作,就是根据那个key在另外一个要join的表中,也过滤出一份数据,然后对这份数据进行flatMap,打上100以内的随机数,作为前缀。对于分开的可能导致数据倾斜的RDD,也给每条数据都打上一个100以内的随机数,做为前缀。再去join,性能会更好。

    使用随机数以及扩容表进行join

    其实这个操作的原理已经在上一讲的改进方案中提到了,就是选择一个RDD,使用flatMap,将每条数据打上10以内的随机数,作为前缀;另一个RDD,使用map,每条数据都打上一个10以内的随机数。最后进行join。

    这种情况适用于每个RDD都很大,导致倾斜的key也很多,是实在没有办法的办法。

    参考资料

    《北风网Spark项目实战》
    github:https://github.com/yangtong123/StudySpark

  • 相关阅读:
    MySQL百万级、千万级数据多表关联SQL语句调优
    不就是SELECT COUNT语句吗,居然有这么多学问
    分布式锁讲解
    Java 中堆和栈的区别
    Java中的回调机制
    在Eclipse上Maven环境配置使用
    项目忽然出现 The superclass "javax.servlet.http.HttpServlet" was not found on the Java Build Path 解决方法
    HttpServletResponse
    com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: No operations allowed after connection closed.
    深入浅出java常量池
  • 原文地址:https://www.cnblogs.com/yangtong/p/7158830.html
Copyright © 2011-2022 走看看