zoukankan      html  css  js  c++  java
  • hive优化--建议多看几遍

    转载地址:

    架构层面优化:

    l  分表

    l  合理利用中间结果集,避免查过就丢的资源浪费,减低Hadoop的IO负载

    l  常用复杂或低效函数尽量不用或拆分成其他实现方式,如count(distinct)

    l  合理设计表分区,静态分区和动态分区

    l  优化时一定要把握整体,单个作业最优不如整个作业最优。

    l  文件存储格式和压缩方式

    l  Hadoop本身的优化

    l  有些逻辑,使用系统函数可能比较复杂,可能涉及多层嵌套,建议使用自定义函数实现。

    架构层面优化,我这里不做过多介绍了,写HQL时要时常考虑按照map-reduce执行方式来写,平时多注意一下,很多问题都可以避免的。下面的介绍的优化中,或多或少对架构层面的优化都有涉及。

    参数层面优化手段

    l  合理控制map和reduce数

    l  合并小文件

    l  避免数据倾斜,解决数据倾斜问题

    l  减少job数(合并Job,大Job拆分)

    l  Hive Job优化

    合理控制mappers和reducers数

    Mappers数

    Mappers过多情况下:

    l  Map阶段输出文件太小,产生大量小文件

    l  初始化和创建Mappers进程的开销很大

    Mappers太少情况下:

    l  文件处理或查询并发度小,Job执行时间过长

    l  大量作业时,容易堵塞集群

    通常情况下,Job会通过输入文件产生一个或多个mapper数,

    主要的决定因素有两个:输入的文件数,输入的文件大小。

    举例:

    a)      假设输入只有1个文件a,大小为780M,那么hadoop会将该文件a分隔成7个block(6个128M的block和1个12M的block,dfs.block.size是128M),从而产生7个mappers数。

    b)      假设输入有3个文件a、b、c,大小分别为10m,20m,130m,那么hadoop会将其分隔为4个block(10m,20m,128m,2m),从而产生4个mappers数。

    注释:以上两种情况均不考虑文件合并的情况。

    两种方式控制Mapper数:即减少mapper数和增加mapper数

    l  减少mapper数可以通过合并小文件来实现,这点是对文件源处理。

    l  增加mapper数可以通过控制上一个job的reducer数来控制(比如:一个sql中多表join会产生多个Map-Reduce任务)。

    比如增大mapred.reduce.tasks数值。

    下面介绍map端的几个控制参数:

    l  set mapred.map.tasks=10;

    此参数直接设置,有时并不生效,其实它是hadoop的参考数值。

    下面我说一下直接设置不生效的原因:

    默认mapper个数计算为:

    # total_size为输入文件总大小,dfs_block_size为HDFS设置的数据块大小(一般为128MB)

    default_mapper_num=total_size/dfs_block_size;

    我们通过参数直接设置的期望mapper个数为:

    # setmapred.map.tasks=10;

    #这个参数设置只有在大于default_mapper_num的时候,才会生效

    goal_mapper_num=mapred.map.tasks;

    下面我们来计算一下,经过map端split处理的文件大小和个数:

    #mapred.min.split.size(数据的最小分割单元大小)

    #mapred.min.split.size 设置每个task处理的文件大小,只有在大于dfs_block_size值时才会生效

    split_size=max(mapred.min.split.size,dfs_block_size);

    split_num=total_size/split_size;

    最终计算的mapper个数:

    compute_mapper_num=min(split_num,max(default_mapper_num,goal_mapper_num))

            

             总结:

             其实根据我自己的实践,调整mapper数之前,我们一定要确定处理的文件大概大小以及文件的存在形式(很多小文件,还是单个大文件以及其他形式),然后合理地调整mapred.min.split.size和mapred.max.split.size的值。

    比如,如果想减少mapper个数,则需要增大mapred.min.split.size的值(因为dfs_block_size一般不变)。

             示例:

    情况1:输入文件很大,但不是小文件组成的

    增大mapred.min.split.size的值。

    情况2:输入文件数量很多,且都是小文件,同时每个文件都小于dfs_block_size。

    这种情况下通过增大mapred.min.split.size不可行。

    原因:增大mapred.min.split.size会造成小文件在网络上来回传输,造成网络负载很大。

    解决办法:需要设置下面参数,使用合并小文件方法,将多个输入文件合并后送给mapper处理,从而减少mapper的数量。

    set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

             (下个小章节会介绍小文件合并的优化)

    l  Map端的聚合,减少Reduce处理负担:

    sethive.map.aggr=true;

    l  推测执行:

    set mapred.map.tasks.speculative.execution=true;

    (reduce端也有类似的参数:mapred.reduce.tasks.speculative.execution)

    所谓的推测执行,就是当所有的task都开始运行之后,Job Tracker会统计所有任务的平均进度,如果某个task所在的task node节点配置比较低或者CPU负载很大,导致任务执行比总体任务的平均执行要慢,此时Job Tracker就会在其他节点启动一个新的任务(duplicatetask),原有任务和新任务哪个先执行完就把其他节点的另外一个任务kill掉。这也是我们经常在Job Tracker页面看到,虽然任务执行成功了,但是发现一些任务被kill掉了,就是这个原因。

    reduce数

    l  Reducers数过多的情况:

    生成了很多个小文件(最终输出文件由reducer决定,一个reducer输出一个文件),那么如果这些小文件作为下一个Job输入,则会出现小文件过多需要进行合并的问题。而且启动和初始化reducer需要耗费时间和资源。

    l  Reducers数过少:

    执行耗时,并且可能出现数据倾斜

    l  Reducer个数的决定:

               默认情况下,Hive分配reducer个数由下列参数决定:

                        参数1:hive.exec.reducers.bytes.per.reducer(默认为1G)

                        参数2:hive.exec.reducers.max(默认为999)

             计算reducer数的公式:

               N=min(参数2,总输入数据量/参数1)

               即默认一个reduce处理1G数据量。

    注意:与mapred.map.tasks参数不同,如果设置了setmapred.reduce.tasks参数的数值,忽略上述计算,reducer个数可以由mapred.reduce.tasks直接指定。

    l  以下情况只有一个reducer:

    某些情况下我们会发现任务中不管数据量多大,不管怎么调整reducer相关的的参数,任务中一直都只有一个reducer任务:

    1、  除了数据量小于hive.exec.reducers.bytes.per.reducer参数值的情况外

    2、  用了group by的汇总

    3、  用了order by

    l  Reduce数决定中间或落地文件数,文件大小和HDFS的block大小无关。

    l  使用场景描述:

    当某个任务有多个Job时,其中某个Job的结果被后面Job多次引用时,设大该参数,以便增加后面访问的Mapper数。

    比如,如果一个Job的输出被另外多个Job调用,假如最前面的Job只生成1G的一个文件,那么后面Job也只会有一个Map来处理,效率明显低很多。

    l  推测执行

    setmapred.reduce.tasks.speculative.execution =true;

    sethive.mapred.reduce.tasks.speculative.execution =true;

    可以看到除了Map-Reduce侧提供推测执行参数,hive侧也提供了推测执行的参数。

    合并小文件

    l  Map阶段Hive自动对小文件进行合并

    参数控制:

    #Map任务结束时就会合并小文件(Map-Only)

    set hive.merge.mapfiles=true;

    #在Map-Reduce的任务结束时合并小文件

    set hive.merge.mapredfiles=true;

    #合并文件的大小(256MB)

    set hive.merge.size.per.task=256000000;

    #每个mapper最大分隔大小(输入大小)

    #结合上面块大小(dfs.block.size=128MB),决定拆分几个mapper数

    set mapred.max.split.size=256000000;

    #一个节点上split至少的大小

    set mapred.min.split.size.per.node=100000000;

    #执行Map前进行小文件合并

    set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

    在开启了org.apache.hadoop.hive.ql.io.CombineHiveInputFormat之后,一个datanode节点上多个小文件会进行合并,合并文件数由mapred.max.split.size限制的大小决定。

    mapred.min.split.size.per.node决定多个datanode上的文件是否需要合并,即多个节点上的文件也可以合并,大小由此决定。

    l  Job合并输入小文件

    set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat

    多个小文件由一个map执行。

    合并文件数由mapred.max.split.size限制的大小决定。

    l  Job合并输出小文件

    sethive.merge.smallfiles.avgsize=256000000;

    当输出文件平均大小小于该值,启动新job用于合并文件。

    对于多个job时,前一个job输出很多大小不均匀的数据文件,对后续的job处理会造成数据倾斜的问题。

    如果输出文件大小均匀,则后续处理的mapper数比较合理。

    sethive.merge.size.per.task=64000000;

    合并之后的文件大小。

    案例(那我们自己的开发环境来测试,我们的环境dfs.block.size为64MB):

    环境如下(默认配置):

    set dfs.block.size=134217728;

    sethive.merge.mapfiles=true;

    sethive.merge.mapredfiles=false;

    sethive.merge.size.per.task=256000000;

    setmapred.max.split.size=256000000;

    setmapred.min.split.size.per.node=256000000;

    setmapred.min.split.size.per.rack=256000000;

    sethive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

    我构造的表如下:

    hive (annuity_safe)> desc formattedtest_data;

    OK

    col_name        data_type       comment

    # col_name              data_type               comment

    deptno                  string

    polno                   string

    certno                  string

    brno                    decimal(2,0)

    ………………………………

    set_of_books_id         string

    is_return               string

    pk_serial               string

    op_month                string

    # Detailed Table Information

    Database:               annuity_safe

    Owner:                  hduser0103

    CreateTime:             Tue Jul 28 11:41:20 CST 2015

    LastAccessTime:         UNKNOWN

    Protect Mode:           None

    Retention:              0

    Location:               hdfs://dev-l002781.app.paic.com.cn:9000/user/hive/warehouse/annuity_safe.db/test_data

    Table Type:             MANAGED_TABLE

    Table Parameters:

           COLUMN_STATS_ACCURATE   true

           numFiles                5

           numRows                 0

           rawDataSize             0

           totalSize               279752108

           transient_lastDdlTime   1438055637

    # Storage Information

    SerDe Library:         org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

    InputFormat:           org.apache.hadoop.mapred.TextInputFormat

    OutputFormat:          org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

    Compressed:             No

    Num Buckets:            -1

    Bucket Columns:         []

    Sort Columns:           []

    Storage Desc Params:

           serialization.format    1

    Time taken: 0.083 seconds, Fetched: 95row(s)

    构造的表对应的文件路径下有5个文件,文件大小除了xae文件为11MB,其他都大概为64MB。

    hive (annuity_safe)> ! hadoop fs -lsrhdfs://dev-l002781.app.paic.com.cn:9000/user/hive/warehouse/annuity_safe.db/test_data;

    -rw-r-----   3hduser0103 hduser0103   671088652015-07-28 11:53 /user/hive/warehouse/annuity_safe.db/test_data/xaa

    -rw-r-----   3hduser0103 hduser0103   671088642015-07-28 11:53 /user/hive/warehouse/annuity_safe.db/test_data/xab

    -rw-r-----   3hduser0103 hduser0103   671088642015-07-28 11:53 /user/hive/warehouse/annuity_safe.db/test_data/xac

    -rw-r-----   3hduser0103 hduser0103   671088642015-07-28 11:53 /user/hive/warehouse/annuity_safe.db/test_data/xad

    -rw-r-----   3hduser0103 hduser0103   113166512015-07-28 11:53 /user/hive/warehouse/annuity_safe.db/test_data/xae

    执行查询(未优化参数情况下):

    hive (annuity_safe)> selectdeptno,count(1),min(fcd),max(fcd) from test_data group by deptno;

    Total jobs = 1

    Launching Job 1 out of 1

    Number of reduce tasks not specified.Estimated from input data size: 1

    In order to change the average load for areducer (in bytes):

      sethive.exec.reducers.bytes.per.reducer=<number>

    In order to limit the maximum number ofreducers:

      sethive.exec.reducers.max=<number>

    In order to set a constant number ofreducers:

      setmapred.reduce.tasks=<number>

    Starting Job = job_201507191627_38369,Tracking URL =http://dev-l002781.app.paic.com.cn:50030/jobdetails.jsp?user.name=hadoop&jobid=job_201507191627_38369

    Kill Command =/appcom/HadoopInstall/hadoop-1.2.1/libexec/../bin/hadoop job  -kill job_201507191627_38369

    Hadoop job information for Stage-1: number of mappers:4; number of reducers: 1

    2015-07-28 13:44:01,643 Stage-1 map =0%,  reduce = 0%

    2015-07-28 13:44:13,786 Stage-1 map =50%,  reduce = 0%, Cumulative CPU 12.15sec

    2015-07-28 13:44:14,800 Stage-1 map = 75%,  reduce = 0%, Cumulative CPU 13.89 sec

    2015-07-28 13:44:37,214 Stage-1 map =100%,  reduce = 0%, Cumulative CPU 18.61sec

    2015-07-28 13:44:38,228 Stage-1 map =100%,  reduce = 33%, Cumulative CPU 18.61sec

    2015-07-28 13:44:40,254 Stage-1 map =100%,  reduce = 100%, Cumulative CPU22.29 sec

    MapReduce Total cumulative CPU time: 22seconds 290 msec

    Ended Job = job_201507191627_38369

    MapReduce Jobs Launched:

    Job 0: Map: 4  Reduce: 1  Cumulative CPU: 22.29 sec   HDFSRead: 279753293 HDFS Write: 6927 SUCCESS

    Total MapReduce CPU Time Spent: 22 seconds290 msec

    OK

    deptno _c1     _c2     _c3

    -08 01:20:15.0  1      NULL    NULL

    0000   1       NULL    NULL

    01000592864     1      G000000000      G000000000

    71679  1       G000000000      G000000000

    G01    1       2001-07-19 15:46:42.0   2001-07-19 15:46:42.0

    G010103 9       2002-09-17 14:08:45.0   2002-09-17 14:10:44.0

    G0123  6161    2006-08-16 14:59:06.0   2010-01-18 15:06:14.0

    G014205 51893   2010-10-08 01:05:01.0   2010-10-08 01:23:55.0

    G014302 4       2011-09-26 19:12:36.0   2011-09-26 19:12:36.0

    G02    32      2000-07-18 09:53:52.0   2002-06-10 13:14:24.0

    G020105 10      2000-06-20 14:08:23.0   2000-08-08 10:37:10.0

    G020301 5       2000-08-15 10:35:00.0   2000-08-21 11:29:07.0

    G020302 2       2000-08-17 09:31:00.0   2000-09-08 10:45:39.0

    …………………………………………………

    可以发现,出现了4个mappers来处理数据文件。

    我们查看页面查看4个task的Counters发现,4个task读取的文件字节数为:

    67,109,107

    201,327,048

    11,316,894

    244

    加起来为268MB左右,但是每个task处理的数据不均匀,其中有一个task处理了约200MB的数据,一个task处理了244字节的数据,便会出现木桶效应。

    案例优化:

    1、 环境的参数优化:

    set hive.merge.mapfiles=true;

    set hive.merge.mapredfiles=true;

    sethive.merge.size.per.task=64000000;

    set mapred.min.split.size=64000000;

    set mapred.max.split.size=64000000;

    setmapred.min.split.size.per.node=64000000;

    set mapred.min.split.size.per.rack=64000000;

    sethive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

    因为合并小文件默认为true,而dfs.block.size与hive.merge.size.per.task的搭配使得合并后的绝大部分文件都在64MB左右。

             我们不用执行上面的查询语句,就大概可以分析如下:

    对于上面表对应的5个数据文件:4个为64MB,1个为11MB,那么上面的查询,会有5个mapper,其中4个mapper分布处理64MB的数据,其他1个mapper分布处理11MB的数据。

    2、  运行上面的HQL,出现的结果与我们上面分析的一致。

    总结:这里我只是提供了一种优化的思路,其实这里还不是最优,我们可以实际的Hadoop环境,hdfs的block大小以及表对应的数据文件,来调整上面的参数(比如将64MB全部修改为128MB,或许更快一点),最终每个task处理的数据大致相同,均衡IO负载,以达到资源最佳使用。

    数据倾斜

    1、   什么是数据倾斜?

    Hadoop框架的特性决定最怕数据倾斜。

    JobTracker和TaskTracker关系相当于老师和学生的关系,JobTracker分发任务给TaskTracker去处理各自的工作,但是不是平均分配的,还得根据TaskTracker本地的数据量多少去做判断。如果每个节点数据分配不均匀,势必造成有的TaskTracker处理的数据量大,有的处理的数据量小。

    由于数据分布不均匀,可能会造成数据大量集中到一个节点或极少数个节点,造成数据热点。

    2、   数据倾斜造成的症状:

    map阶段快,reduce阶段非常慢,

              某些mapper很快,某些mapper很慢,

    某些reducer很快,某些reducer奇慢。

    3、   数据倾斜可能在如下场景中出现:

    A、 数据在节点上分布不均匀(无法避免)

    B、 join时on关键词中个别值量很大(如null值)

    C、 count(distinct)在数据量大的情况下,容易数据倾斜,因为count(distinct)是按group by字段分组,按distinct字段排序(有时无法避免)。

    其中A无法避免,B见后边介绍的Join优化部分,C语法上有时无法避免。

    关键词

    情形

    后果

    Join

    其中一个表较小,

    但是key集中

    分发到某一个或几个Reducer上的数据远高于平均值

    大表与大表,但是分桶的判断字段0值或空值过多

    这些空值都由一个reducer处理,非常慢

    group by

    group by 维度过小,

    某值的数量过多

    处理某值的reducer灰常耗时

    Count Distinct

    某特殊值过多

    处理此特殊值的reducer耗时

    减少job数(合并Job,大Job拆分)

    1、   减少Job数

    当源表相同时,如下可以合并Job,从而减少job数:

    l  Join时,On 字段相同

    多表join on条件相同时,合并为一个Map-Reduce。

    select pt.page_id,count(t.url) pv

    from rpt_page_type pt

    join

    (select refer_page_id,url_page_id,url from trackinfo where ds = ‘2013-10-11’)t

    on pt.page_id = t.url_page_id

    join

    (select page_id from rpt_page_kpi_new where ds = ‘2013-10-11’)r

    on t.url_page_id= r.page_id

    group by  pt.page_id;

    利用这个特性,可以把相同join on条件的放在一个job处理。

    l  union all

    对同一个表的union all只查询一次源表,Hive本身对这种union all做过优化。

    selecturl,session_id from

    (selecturl,session_id

    from trackinfowhere ds=’2013-11-01’

    union all

    selecturl,session_id

    from trackinfowhere ds=’2013-11-02’

    )t;

    l  Multi-insert(Multi-group by一定会和Multi-insert一起使用,同一源表,可按照不同where、不同group by进行计算)

    条件:源表相同,上方的SQL等同于:

    from trackinfo

     insert overwrite table tmp_testpartition(step=1)

    select url,session_id where ds=’2013-11-01’

    insert overwrite table tmp_test partition(step=2)

    selecturl,session_id where ds=’2013-11-02’;

    Hive Job优化

    l  并行化执行

    每个查询被hive转化成多个阶段,有些阶段关联性不大,则可以并行化执行,减少执行时间。

    sethive.exec.parallel=true;

    sethive.exec.parallel.thread.number=15;

    实例:

    select num

    from

    (selectcount(city) as num

    from city

    union all

    selectcount(province) as num

    fromprovince)tmp;

    union all两侧的查询语句会同时执行。

    l  本地化执行(感觉生产环境用处不大)

    sethive.exec.mode.local.auto=true;

    当一个job满足如下条件才能真正使用本地模式:

    a、   job的输入数据大小必须小于参数:

    hive.exec.mode.local.auto.inputbytes.max(默认128MB)

    b、   job的map数必须小于参数:

    hive.exec.mode.local.auto.tasks.max(默认为4)

    c、  job的reducer数必须为0或者1

    如果你的环境不满足上述条件时,执行过程会提示原因,如Input size大于hive.exec.mode.local.auto.inputbytes.max值,或input files个数大于hive.exec.mode.local.auto.tasks.max值,同时会取消本地化执行,改为其他方式执行。

    l  JVM重利用

    set mapred.job.reuse.jvm.num.tasks=15;

    JVM重利用可以是Job长时间保留slot,直到作业结束,这对于有较多任务和较多小文件的任务是非常有意义的,因为减少了JVM的启动和初始化时间,从而减少执行时间。当然这个值不能设置过大,因为有些作业会有reduce任务,如果reduce任务没有完成,则map任务占用的slot不能释放,其他作业可能就需要等待。

    l  Hive压缩数据

    中间压缩就是处理hive查询的多个job之间的数据,对于中间压缩,最好选择一个节省CPU耗时的压缩方式(即压缩率比较适中)。

    set hive.exec.compress.intermediate=true;

    set hive.intermediate.compression.codec=

    org.apache.hadoop.io.compress.SnappyCodec;

    sethive.intermediate.compression.type=BLOCK;

    Hive查询最终的输出结果文件采用压缩(落地文件的压缩率可以选择较高的压缩率)

    set hive.exec.compress.output=true;

    set mapred.output.compression.codec=

    org.apache.hadoop.io.compress.GzipCodec;

    setmapred.output.compression.type=BLOCK;

    语法(包含参数)层面优化

    l  Join

    l  Mapjoin

    l  Bucket join

    l  Group by

    l  Count(distinct)

    l  笛卡尔积

    l  提前裁剪数据,避免资源浪费

    l  Hive表的优化

    Join优化

    l  数据按照join的key进行分发,而在join左边的表的数据会首先部分或全部读入内存,如果左边表的key相对分散(单个key值数据量小,或者说相同key的数据量小),读入内存的数据会比较小,join任务执行会比较快,而如果左边的表key比较集中,而这张表的数据量又很大,那么数据倾斜就会比较严重。

                Map阶段同一key数据会分发给同一个reducer计算。

    l  join原则:

    1)       小表join大表

    在join操作的Reduce阶段(不是map阶段),位于join左边的表的内容会被加载进内存,将条目少的表放在左边,可以有效减少发生内存进出的几率。

                    解决办法:

    多个表关联时,最好分拆成几个小段,避免大sql(无法控制中间job)。

    2)       大表join大表

    大表关联中,如果join的key中含有大量null,在使用key进行hash分发时,会将数据文件中key为空的数据都分到一个节点,造成了数据倾斜。

    解决办法:

    把空值的key变成一个字符串加上随机数,把倾斜的数据分发到不同的reduce上,由于null值关联不上,处理后并不影响最终结果。

    l  Join中对join key存在大量空值的优化演示:

    end_user_id中存在大量null值。

    原始HQL:

    select u.id, t.url, t.track_time

    from end_user u

    join

    (select end_user_id,url,track_time from trackinfo where ds= '2013-12-01')t

    on u.id = t.end_user_id limit 2;

    优化后HQL为:

    select u.id, t.url, t.track_time

    from end_user u

    join

    (select case when end_user_id = 'null' or end_user_id is null

                                then cast(concat(‘00000000’,floor(rand()*1000000))as bigint)

                                else end_user_id endend_user_id,

                                url,track_time

    from trackinfo where ds= '2013-12-01') t

    on u.id = t.end_user_id limit 2;

    l  Join对数据倾斜的参数优化:

    set hive.optimize.skewjoin=true;

                       如果在join过程中出现倾斜,参数值应该设置为true。

                      

    set hive.skewjoin.key=1000000;

                       这个是join的键(key)对应的记录条数超过这个值则会进行join自动优化。

                       上面两个参数设置后的优化原理是:

                                没优化之前,join会启动一个job,但是设置优化参数后,会启动两个job。

    第一个job会将键(key)超过hive.skewjoin.key记录的键加上一些随机数等,将这些相同的key打乱,然后跑到不同的节点上面进行计算(reduce阶段)。然后再启动一个job,在第一个job处理的基础上(即第一个job的reduce输出结果)再进行计算,将相同的key分发到相同的节点上处理。

    l  Join时的关联键key的数据类型一定要相同,否则会产生数据倾斜问题

    由于test_a表中的id为字符串型,所以我们将test_b表数字类型转换成字符串类型

    select a.* fromtest_a a

    left outer jointest_b b

    On a.id =cast(b.id as string);

    Mapjoin

    l  Mapjoin(map端执行join操作):

    mapjoin的计算原理:

    mapjoin会把小表全部读入内存中,在map阶段直接拿另外一个表的数据和内存中表数据做匹配。

    l  Join 操作在Map阶段完成,如果需要的数据在Map的过程中可以处理掉,则不再需要Reduce阶段,加快了执行效率。

    小表关联一个超大表时,容易发生数据倾斜,可以用Mapjoin把小表全部加载到内存,并在map端进行join操作,避免reducer处理。

    如:

    insert overwrite table page_pv

    select /*+ MapJoin(pt)*/

                       pt.page_id,count(t.url) pv

                from rpt_page_type pt

    join

    (select url_page_id, url from trackinfo where ds = '2013-10-11') t

    on pt.page_id = t.url_page_id;

    l  mapjoin的使用场景:

    1)      关联操作中有一张表非常小

    2)      不等值的连接操作

    l  Mapjoin两种使用方式:

    1)      通过参数设置,Hive自动选择执行Mapjoin操作

    hive.auto.convert.join=true;

                          hive.mapjoin.smalltable.filesize=25000000;-------默认为25MB

    原理:将小于hive.mapjoin.smalltable.filesize数值的表加载到分布式缓存中,这样整个集群节点上map端任务都可以访问缓存中的数据。

                               

    2)      另外一种方式,可以不设置参数,通过hint方式指定:

    select /*+ mapjoin(test_b) */a.key,a.value fromtest_a a join test_b b on a.key = b.key;

    l  Mapjoin其他参数设置

    set hive.mapjoin.cache.numrows=25000;

    说明:mapjoin存在内存里的数据量。

    set hive.mapjoin.followby.gby.localtask.max.memory.usage=0.55;

    说明:map join做group by操作时,可以使用多大的内存来存储数据,如果数据太大,则不会保存在内存里。

             

    set hive.mapjoin.localtask.max.memory.usage=0.90;

    说明:本地任务可以使用内存的百分比

    bucket join

    l  使用bucket join需要满足下面两个条件

    (1)      两个表以相同方式(key)划分桶

    (2)      两个表的桶个数是倍数关系

    create table order(cidint,price decimal(18,2)) clustered by (cid) into 32 buckets;

    create tablecustomer(id int,first string,last string) clustered by (id) into 32(or 64……)buckets;

    select pricefrom order o join customer c on o.cid = c.id;

    说明:

    查询语法与普通表一样,但是底层执行却不一样。根据key只会查找对应的桶即可,比如:如果cid=1,那么只会从customer中查找id=1的数据,这些数据都位于一个桶中,所以只需访问一个桶即可。

    group by

    l  Map端部分聚合:

                       并不是所有的聚合操作都需要在Reduce端完成,很多聚合操作都可以先在Map端进行部分聚合,最后在Reduce端得出最终结果。

    l  Map端部分聚合参数:

        #是否在map端进行聚合

                       hive.map.aggr=true;

    l  有数据倾斜的时候进行负载均衡

    #如果group by过程中出现倾斜,应该设置为true

        hive.groupby.skewindata=true;

                      

    #在map端进行聚合操作的条目数目

                       #这个是group by的键对应的记录条数超过这个值则会进行优化

                       hive.groupby.mapaggr.checkinterval=100000;

    和mapjoin类似,group by优化后也会启动两个Job。

    当选项设为true时,生成的查询计划会有两个MR job,第一个MR job中,Map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group by key有可能被分发到不同的Reduce中,从而达到负载均衡的目的。

    第二个MR job再根据预处理的数据结果按照group bykey分布到Reduce中(这个过程可以保证相同的group by 被分发到同一个Reduce中),最后完成最终的聚合操作。

    Count(distinct)

    l  当该字段存在大量值为null或空的记录时容易造成倾斜。

    解决思路:

    1)  count(distinct)时,将值为空的数据在where里过滤掉,在最后结果中加1。

    2)  如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union

    3)  如果group by维度过小,则可以  采用count和group by的方式来替换count(distinct)完成计算

    l  特殊情况特殊处理:

    在业务逻辑优化效果不大情况下,有些时候是可以将倾斜的数据单独拿出来处理,最后union回去。

    l  countdistinct优化:

    实例1:

             优化前:

             selectcount(distinct id) from student;

             只有一个job任务,而且只有一个reduce,处理的工作量比较大。

            

    优化后:

             selectcount(1) from (select distinct id from student) tmp;

             或

             selectcount(1) from (select id from student group by id) tmp;

              可以通过设置set mapred.reduce.tasks的值,加快(select distinct id from student) tmp部分的处理。

    实例2:

    优化前:

    selecta,sum(b),count(distinct c),count(distinct d)

    from test

    group by a;

    优化后:

    select a,sum(b)as b,count(c) as c,count(d) as d

    from (

    select a,0 as b,c,null as d from test group by a,c

    union all

    select a,0 as b,null as c,d from test group by a,d

    union all

    select a,b,null as c,null as d from test

    )tmp group bya;

    笛卡尔积

    尽量避免笛卡尔积,join的时候不加on条件,或者无效的on条件,Hive只能使用1个reducer来完成笛卡尔积。

    之前我有遇到过一种情况,不得不使用笛卡尔积,表关联的条件为不等式,还好两张表不大。如果你不得不使用笛卡尔积,那么一定要看一下其中的表是否符合Mapjoin的要求,如果符合,那么一定要使用Mapjoin。

    提前裁剪数据,避免资源浪费

    join优化前:

    select o.cid,c.id

    from order o

    join customer c

    on o.cid = c.id

    where o.dt = '2015-07-26';

    join优化后:

    select o.cid,c.id

    from

    (select cid

    from order

    where dt = '2015-07-26'

    )o

    join customer c

    on o.cid = c.id;

    对一些过滤条件,能尽早过滤的就尽早过滤,减少IO资源浪费。

    这个需要个人工作中注意就好了。

    Hive表的优化

    l  分区:

    1)      静态分区

    2)      动态分区

    set hive.exec.dynamic.partition=true;

    set hive.exec.dynamic.partition.mode=nonstrict;

    l  分桶

    sethive.enforce.bucketing=true;

    sethive.enforce.sorting=true;

    l  数据

    相同数据尽量聚集在一起,和分桶原理类似,尽量减少网络数据传输

  • 相关阅读:
    鳥哥的 Linux 私房菜——第十三章、学习 Shell Scripts(转发)(未完待续)
    鳥哥的 Linux 私房菜——第十六章、例行性工作排程 (crontab) (转发)(未完待续)
    RT-Thread ------ event 事件
    sscanf() ------ 获取字符串中的参数
    燃气热水器的调节
    Adobe Illustrator CC ------ AI脚本插件合集
    你真的理解CSS的linear-gradient?
    IDEA中Grep Console插件的安装及使用
    Windows下删除以.结尾文件夹的方法
    lwip库的发送和接收函数
  • 原文地址:https://www.cnblogs.com/gudaozi/p/8427100.html
Copyright © 2011-2022 走看看