zoukankan      html  css  js  c++  java
  • Spark 数据倾斜解决方案:原理+聚合源数据+提高reduce并行度+双重聚合+map join+sample采样+扩容表

    一.数据倾斜解决方案之原理以及现象分析

     1、数据倾斜的现象

    在任何大数据类的项目中,都是最棘手的性能问题,最能体现人的技术能力,最能体现RD(Research Developer,研发工程师)的技术水平。

    数据倾斜 = 性能杀手

    如果没有丰富的经验,或者没有受过专业的技术培训,是很难解决数据倾斜问题的

     

    在执行shuffle操作的时候,大家都知道,我们之前讲解过shuffle的原理。是按照key,来进行values的数据的输出、拉取和聚合的。

    同一个key的values,一定是分配到一个reduce task进行处理的。

    多个key对应的values,总共是90万。但是问题是,可能某个key对应了88万数据,key-88万values,分配到一个task上去面去执行。

    另外两个task,可能各分配到了1万数据,可能是数百个key,对应的1万条数据。

     

    想象一下,出现数据倾斜以后的运行的情况。很糟糕!极其糟糕!无比糟糕!

    第一个和第二个task,各分配到了1万数据;那么可能1万条数据,需要10分钟计算完毕;

    第一个和第二个task,可能同时在10分钟内都运行完了;

    第三个task要88万条,88 * 10 = 880分钟 = 14.5个小时;

    大家看看,本来另外两个task很快就运行完毕了(10分钟),

    但是由于一个拖后腿的家伙,第三个task,要14.5个小时才能运行完,就导致整个spark作业,也得14.5个小时才能运行完。

    导致spark作业,跑的特别特别特别特别慢!!!像老牛拉破车!

    数据倾斜,一旦出现,是不是性能杀手。。。。

    2、发生数据倾斜以后的现象:

    spark数据倾斜,有两种表现:

    (1)有的task快,有的task慢

    你的大部分的task,都执行的特别特别快,刷刷刷,就执行完了(你要用client模式,standalone client,yarn client,本地机器主要一执行spark-submit脚本,就会开始打印log),

    task175 finished;剩下几个task,执行的特别特别慢,前面的task,一般1s可以执行完5个;最后发现1000个task,998,999 task,要执行1个小时,2个小时才能执行完一个task。

    出现数据倾斜了

    还算好的,因为虽然老牛拉破车一样,非常慢,但是至少还能跑。

    (2)直接报错

    运行的时候,其他task都刷刷刷执行完了,也没什么特别的问题;但是有的task,就是会突然间,啪,报了一个OOM,JVM Out Of Memory,

    内存溢出了,task failed,task lost,resubmitting task。反复执行几次都到了某个task就是跑不通,最后就挂掉。

    某个task就直接OOM,那么基本上也是因为数据倾斜了,task分配的数量实在是太大了!!!

    所以内存放不下,然后你的task每处理一条数据,还要创建大量的对象。内存爆掉了。

    出现数据倾斜了

    这种就不太好了,因为你的程序如果不去解决数据倾斜的问题,压根儿就跑不出来。

    作业都跑不完,还谈什么性能调优这些东西。扯淡。。。


    3、定位原因与出现问题的位置:

    根据log去定位

    出现数据倾斜的原因,基本只可能是因为发生了shuffle操作,在shuffle的过程中,出现了数据倾斜的问题。

    因为某个,或者某些key对应的数据,远远的高于其他的key。

    1、你在自己的程序里面找找,哪些地方用了会产生shuffle的算子,groupByKey、countByKey、reduceByKey、join

    2、看log

    log一般会报是在你的哪一行代码,导致了OOM异常;或者呢,看log,看看是执行到了第几个stage!!! 

    二.数据倾斜解决方案之聚合源数据以及过滤导致倾斜的key

    数据倾斜的解决,跟之前讲解的性能调优,有一点异曲同工之妙。

    性能调优,跟大家讲过一个道理,“重剑无锋”。性能调优,调了半天,最有效,最直接,最简单的方式,就是加资源,加并行度,注意RDD架构(复用同一个RDD,加上cache缓存);shuffle、jvm等,次要的。

    数据倾斜,解决方案,第一个方案和第二个方案,一起来讲。最朴素、最简谱、最直接、最有效、最简单的,解决数据倾斜问题的方案。

    第一个方案:聚合源数据
    第二个方案:过滤导致倾斜的key

    重剑无锋。后面的五个方案,尤其是最后4个方案,都是那种特别炫酷的方案。双重group聚合方案;sample抽样分解聚合方案;如果碰到了数据倾斜的问题。上来就先考虑考虑第一个和第二个方案,能不能做,如果能做的话,后面的5个方案,都不用去搞了。

    有效。简单。直接。效果是非常之好的。彻底根除了数据倾斜的问题。

    (1)第一个方案:聚合源数据

    咱们现在,做一些聚合的操作,groupByKey、reduceByKey;groupByKey,说白了,就是拿到每个key对应的values;reduceByKey,说白了,就是对每个key对应的values执行一定的计算。

    现在这些操作,比如groupByKey和reduceByKey,包括之前说的join。都是在spark作业中执行的。

    spark作业的数据来源,通常是哪里呢?90%的情况下,数据来源都是hive表(hdfs,大数据分布式存储系统)。hdfs上存储的大数据。hive表,hive表中的数据,

    通常是怎么出来的呢?

    有了spark以后,hive比较适合做什么事情?hive就是适合做离线的,

    晚上凌晨跑的,ETL(extract transform load,数据的采集、清洗、导入),hive sql,去做这些事情,从而去形成一个完整的hive中的数据仓库;说白了,数据仓库,就是一堆表。

    spark作业的源表,hive表,其实通常情况下来说,也是通过某些hive etl生成的。hive etl可能是晚上凌晨在那儿跑。今天跑昨天的数九。

    1.聚合源数据方案,第一种做法

    数据倾斜,某个key对应的80万数据,某些key对应几百条,某些key对应几十条;现在,咱们直接在生成hive表的hive etl中,对数据进行聚合。

    比如按key来分组,将key对应的所有的values,全部用一种特殊的格式,拼接到一个字符串里面去,

    比如“key=sessionid, value: action_seq=1|user_id=1|search_keyword=火锅|category_id=001;action_seq=2|user_id=1|search_keyword=涮肉|category_id=001”。

    对key进行group,在spark中,拿到key=sessionid,values<Iterable>;hive etl中,直接对key进行了聚合。

    那么也就意味着,每个key就只对应一条数据。在spark中,就不需要再去执行groupByKey+map这种操作了。直接对每个key对应的values字符串,map操作,进行你需要的操作即可。key,values串。

    spark中,可能对这个操作,就不需要执行shffule操作了,也就根本不可能导致数据倾斜。

    或者是,对每个key在hive etl中进行聚合,对所有values聚合一下,不一定是拼接起来,可能是直接进行计算。reduceByKey,计算函数,应用在hive etl中,每个key的values。


    2.聚合源数据方案,第二种做法

    你可能没有办法对每个key,就聚合出来一条数据;

    那么也可以做一个妥协;对每个key对应的数据,10万条;有好几个粒度,比如10万条里面包含了几个城市、几天、几个地区的数据,

    现在放粗粒度;直接就按照城市粒度,做一下聚合,几个城市,几天、几个地区粒度的数据,都给聚合起来。比如说

    city_id date area_id

    select ... from ... group by city_id

    尽量去聚合,减少每个key对应的数量,也许聚合到比较粗的粒度之后,原先有10万数据量的key,现在只有1万数据量。减轻数据倾斜的现象和问题。


    上面讲的第一种方案,其实这里没法讲的太具体和仔细;只能给一个思路。但是我觉得,思路已经讲的非常清晰了;

    一般来说,大家只要有一些大数据(hive)。经验,我觉得都是可以理解的。

    具体怎么去在hive etl中聚合和操作,就得根据你碰到数据倾斜问题的时候,你的spark作业的源hive表的具体情况,具体需求,具体功能,具体分析。


    对于我们的程序来说,完全可以将aggregateBySession()这一步操作,放在一个hive etl中来做,形成一个新的表。

    对每天的用户访问行为数据,都按session粒度进行聚合,写一个hive sql。

    在spark程序中,就不要去做groupByKey+mapToPair这种算子了。

    直接从当天的session聚合表中,用Spark SQL查询出来对应的数据,即可。这个RDD在后面就可以使用了。

    (2)第二个方案:过滤导致倾斜的key

    如果你能够接受某些数据,在spark作业中直接就摒弃掉,不使用。比如说,总共有100万个key。只有2个key,是数据量达到10万的。其他所有的key,对应的数量都是几十。

    这个时候,你自己可以去取舍,如果业务和需求可以理解和接受的话,在你从hive表查询源数据的时候,直接在sql中用where条件,过滤掉某几个key。

    那么这几个原先有大量数据,会导致数据倾斜的key,被过滤掉之后,那么在你的spark作业中,自然就不会发生数据倾斜了。

    三.数据倾斜解决方案之提高shuffle操作reduce并行度

    第一个和第二个方案,都不适合做。

    第三个方案,提高shuffle操作的reduce并行度

    将reduce task的数量,变多,就可以让每个reduce task分配到更少的数据量,这样的话,也许就可以缓解,或者甚至是基本解决掉数据倾斜的问题。

    (1)提升shuffle reduce端并行度,怎么来操作?

    很简单,主要给我们所有的shuffle算子,比如groupByKey、countByKey、reduceByKey。

    在调用的时候,传入进去一个参数。一个数字。那个数字,就代表了那个shuffle操作的reduce端的并行度。那么在进行shuffle操作的时候,就会对应着创建指定数量的reduce task。

    这样的话,就可以让每个reduce task分配到更少的数据。基本可以缓解数据倾斜的问题。

    比如说,原本某个task分配数据特别多,直接OOM,内存溢出了,程序没法运行,直接挂掉。按照log,找到发生数据倾斜的shuffle操作,给它传入一个并行度数字,

    这样的话,原先那个task分配到的数据,肯定会变少。就至少可以避免OOM的情况,程序至少是可以跑的。


    (2)提升shuffle reduce并行度的缺陷

    治标不治本的意思,因为,它没有从根本上改变数据倾斜的本质和问题。不像第一个和第二个方案(直接避免了数据倾斜的发生)。

    原理没有改变,只是说,尽可能地去缓解和减轻shuffle reduce task的数据压力,以及数据倾斜的问题。

    (3)实际生产环境中的经验。

    1、如果最理想的情况下,提升并行度以后,减轻了数据倾斜的问题,或者甚至可以让数据倾斜的现象忽略不计,那么就最好。就不用做其他的数据倾斜解决方案了。

    2、不太理想的情况下,就是比如之前某个task运行特别慢,要5个小时,现在稍微快了一点,变成了4个小时;

    或者是原先运行到某个task,直接OOM,现在至少不会OOM了,但是那个task运行特别慢,要5个小时才能跑完。

    那么,如果出现第二种情况的话,各位,就立即放弃第三种方案,开始去尝试和选择后面的四种方案。

     

    四.数据倾斜解决方案之使用随机key实现双重聚合

    使用随机key实现双重聚合

    1、原理

    2、使用场景
    (1)groupByKey
    (2)reduceByKey

    比较适合使用这种方式;join,咱们通常不会这样来做,后面会讲三种,针对不同的join造成的数据倾斜的问题的解决方案。

    第一轮聚合的时候,对key进行打散,将原先一样的key,变成不一样的key,相当于是将每个key分为多组;

    先针对多个组,进行key的局部聚合;接着,再去除掉每个key的前缀,然后对所有的key,进行全局的聚合。

    对groupByKey、reduceByKey造成的数据倾斜,有比较好的效果。

    如果说,之前的第一、第二、第三种方案,都没法解决数据倾斜的问题,那么就只能依靠这一种方式了。

    五.数据倾斜解决方案之将reduce join转换为map join

     

    (1)reduce join转换为map join,适合在什么样的情况下,可以来使用?

    如果两个RDD要进行join,其中一个RDD是比较小的。一个RDD是100万数据,一个RDD是1万数据。(一个RDD是1亿数据,一个RDD是100万数据)

    其中一个RDD必须是比较小的,broadcast出去那个小RDD的数据以后,就会在每个executor的block manager中都驻留一份。要确保你的内存足够存放那个小RDD中的数据

    这种方式下,根本不会发生shuffle操作,肯定也不会发生数据倾斜;从根本上杜绝了join操作可能导致的数据倾斜的问题;

    对于join中有数据倾斜的情况,大家尽量第一时间先考虑这种方式,效果非常好;如果某个RDD比较小的情况下。


    (2)不适合的情况:

    两个RDD都比较大,那么这个时候,你去将其中一个RDD做成broadcast,就很笨拙了。很可能导致内存不足。最终导致内存溢出,程序挂掉。

    而且其中某些key(或者是某个key),还发生了数据倾斜;此时可以采用最后两种方式。

    对于join这种操作,不光是考虑数据倾斜的问题;即使是没有数据倾斜问题,也完全可以优先考虑,用我们讲的这种高级的reduce join转map join的技术,

    不要用普通的join,去通过shuffle,进行数据的join;完全可以通过简单的map,使用map join的方式,牺牲一点内存资源;在可行的情况下,优先这么使用。

    不走shuffle,直接走map,是不是性能也会高很多?这是肯定的。

    六.数据倾斜解决方案之sample采样倾斜key单独进行join

     

     

    这个方案的实现思路,跟大家解析一下:其实关键之处在于,将发生数据倾斜的key,单独拉出来,放到一个RDD中去;就用这个原本会倾斜的key RDD跟其他RDD,单独去join一下,这个时候,key对应的数据,可能就会分散到多个task中去进行join操作。

    就不至于说是,这个key跟之前其他的key混合在一个RDD中时,肯定是会导致一个key对应的所有数据,都到一个task中去,就会导致数据倾斜。

     

    这种方案什么时候适合使用?

    优先对于join,肯定是希望能够采用上一讲讲的,reduce join转换map join。两个RDD数据都比较大,那么就不要那么搞了。

    针对你的RDD的数据,你可以自己把它转换成一个中间表,或者是直接用countByKey()的方式,你可以看一下这个RDD各个key对应的数据量;此时如果你发现整个RDD就一个,或者少数几个key,是对应的数据量特别多;尽量建议,比如就是一个key对应的数据量特别多。

    此时可以采用咱们的这种方案,单拉出来那个最多的key;单独进行join,尽可能地将key分散到各个task上去进行join操作。

    什么时候不适用呢?

    如果一个RDD中,导致数据倾斜的key,特别多;那么此时,最好还是不要这样了;还是使用我们最后一个方案,终极的join数据倾斜的解决方案。

    就是说,咱们单拉出来了,一个或者少数几个可能会产生数据倾斜的key,然后还可以进行更加优化的一个操作;

    对于那个key,从另外一个要join的表中,也过滤出来一份数据,比如可能就只有一条数据。userid2infoRDD,一个userid key,就对应一条数据。

    然后呢,采取对那个只有一条数据的RDD,进行flatMap操作,打上100个随机数,作为前缀,返回100条数据。

    单独拉出来的可能产生数据倾斜的RDD,给每一条数据,都打上一个100以内的随机数,作为前缀。

    再去进行join,是不是性能就更好了。肯定可以将数据进行打散,去进行join。join完以后,可以执行map操作,去将之前打上的随机数,给去掉,然后再和另外一个普通RDD join以后的结果,进行union操作。

    七.数据倾斜解决方案之使用随机数以及扩容表进行join

     

  • 相关阅读:
    PAT 解题报告 1009. Product of Polynomials (25)
    PAT 解题报告 1007. Maximum Subsequence Sum (25)
    PAT 解题报告 1003. Emergency (25)
    PAT 解题报告 1004. Counting Leaves (30)
    【转】DataSource高级应用
    tomcat下jndi配置
    java中DriverManager跟DataSource获取getConnection有什么不同?
    理解JDBC和JNDI
    JDBC
    Dive into python 实例学python (2) —— 自省,apihelper
  • 原文地址:https://www.cnblogs.com/Transkai/p/11455818.html
Copyright © 2011-2022 走看看