zoukankan      html  css  js  c++  java
  • 1

    1.1 Map端文件合并减少Map任务数量

    一般来说,HDFS的默认文件块大小是128M,如果在Hive执行任务时,发现Map端的任务过多,且执行时间多数不超过一分钟,建议通过参数,划分(split)文件的大小,合并小文件。如:

    1 set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
    2 set mapreduce.input.fileinputformat.split.minsize=516000000; -- 516M
    3 set mapreduce.input.fileinputformat.split.maxsize=1280000000; -- 1280M
    4 set mapreduce.input.fileinputformat.split.minsize.per.node=516000000;
    5 set mapreduce.input.fileinputformat.split.minsize.per.rack=516000000;

    这样可以减小map的任务数,可以减少中间临时文件的产生,并且也较少reduce阶段的任务数和产生的文件数。

    1.2 Reduce阶段数量调整
    若指定mapred.reduce.tasks参数,则用该参数值;mapred.reduce.tasks默认值为-1,表示自动计算;

    若未指定mapred.reduce.tasks,Hive会自动计算reduce个数,基于以下两个配置:

    hive.exec.reducers.bytes.per.reducer:每个reduce任务处理的数据量,默认为1G
    hive.exec.reducers.max:每个任务最大的reduce数,集群默认为50

    reducer数的计算公式如下:

    1 reduce_number = min(input_size / hive.exec.reducers.bytes.per.reducer, hive.exec.reducers.max)

    建议一般使用下面2个配置进行reduce阶段任务数量控制:

    set hive.exec.reducers.bytes.per.reducer = 1000000000; - (Generally, the default value of 1G is enough, generally no need to modify)
    set hive.exec.reducers.max = 50; - (up to 100 reduce)

    如果reduce阶段任务运行慢,可以考虑增加hive.exec.reducers.max的数量,但不宜过多,过多会产生小文件。

    1.3 Reduce阶段内存调整

    在我们减少Map端的任务数时 ,在Reduce阶段会存在任务数据量大的问题,可能会超过Reduce阶段默认的内存,从而导致OOM。

    我们可以通过调整Reduce阶段的内存进行优化:

    1 set mapreduce.reduce.memory.mb=8192; - The default value is 1024, the unit is M, here is set to 8192M
    2 set mapreduce.reduce.java.opts=-Xmx6144m; - Set the memory of the Reduce phase jvm, generally 0.75 times the previous parameter

    二、MapReduce压缩优化
    2.1 Map端数据压缩,减少Reduce copy阶段时间
    因为在Reduce过程,需要从Map阶段产生的文件的机器进行copy(非local机器)数据,这个过程是依赖网络带宽的,整个过程是非常缓慢,所以可以对Map端产生的文件进行压缩处理,降低带宽的限制。

    1 set hive.exec.compress.intermediate = true; - Enable compression on intermediate data
    2 set mapreduce.map.output.compress=true; - Whether to start compression (true/false)
    3 set mapreduce.map.output.compress.codec = org.apache.hadoop.io.compress.SnappyCodec; - Specify Snappy compression format

    当然在开启Map压缩,会有一定的CPU消耗,但MapReduce任务是IO型的任务,消耗掉CPU也不是很要求,如果是计算密集型的计算,不建议开启CPU(计算资源宝贵呀)。

    2.2 Reduce端数据压缩,减小文件块大小,节约存储资源
    1 set hive.exec.compress.output=true; - Enable hive final output data compression function
    2 set mapreduce.output.fileoutputformat.compress=true; - Turn on mapreduce final output data compression
    3 set mapreduce.output.fileoutputformat.compress.codec =org.apache.hadoop.io.compress.SnappyCodec; - Set the compression method of mapreduce final data output
    4 set mapreduce.output.fileoutputformat.compress.type=BLOCK; - Set mapreduce final data output compression to block compression

    join导致的数据倾斜
    如果数据表的key本身分布不匀,在join时,特别容易出现数据倾斜的问题。一般出现该问题有2种情况:

    3.1.1 小表与大表进行join

    一般建议,在表与表进行join时,我们将小表放在左边效率会更高些。

    如果小表足够小(如:一张维度表),是可以加载到内存中的。直接在Map端进行join,避免数据shuffle过程,导致数据倾斜问题。不过需要设置下面参数:

    1 set hive.auto.convert.join = true; -- 将小表进行map-join操作
    2 set hive.mapjoin.smalltable.filesize = 2500000;-- 默认值25M,如果表的大小小于此值就会被加载进内存中

    也可以使用两个配置参数下面的2个配置,启用自动联接,不再需要在查询中提供 Map 联接提示。

    1 set hive.auto.convert.join.noconditionaltask = true; -- 表示启用了自动转换
    2 set hive.auto.convert.join.noconditionaltask.size = 10000000; 

    3.1.2 两张同量级的表进行join

    如果是2张同量级的表进行join产生数据倾斜问题,可以尽量从业务数据层面去解决,其次是调整Hive参数。

    1.一般来讲,这种情况是某些key在join时,数据量大导致分布不匀,将key加上一个随机数在进行join。如:

    1 select a.key as key,value1,value2 
    2 from 
    3 (select key,value1,concat(key,cast(rand() * 10 as int)) as key_1 
    4 from table1) a
    5 left join
    6 (select key,value2,concat(key,cast(rand() * 10 as int)) as key_1
    7 from table2) b
    8 on a.key_1 = b.key_1;

    或者是拆成2个部分,将key量大的部分过滤出来,在分别处理。

    不过在此之前对数据先进行预处理,过滤些缺失值和无效值,防止这种数据导致数据倾斜。

    2.通过Hive参数处理,设置参数:

    1 set hive.optimize.skewjoin = true; -- 有数据倾斜的时候进行负载均衡
    2 set hive.skewjoin.key = 100000; -- 默认是100000,默认的1个reduce 只处理1G 

    hive.optimize.skewjoin参数是对join时,key超过设置的hive.skewjoin.key的数量是,就会将该key的数据存储至HDFS,然后运行一个map任务进行处理。防止数据倾斜。

    与此类似的有group by操作的数据倾斜处理:hive.groupby.skewindata=true,不过这个参数不适用在多个维度distinct,不然会报错。

    3.2 group by导致的数据倾斜
    一般可以通过下面参数调节:

    1 set hive.map.aggr=true; -- Map 端部分聚合,相当于Combiner
    2 set hive.groupby.skewindata=true;

    hive.groupby.skewindata=true时,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。

    不过这个参数不适用在多个维度distinct,不然会报错。

    四、动态分区插入优化
    动态分区参数:

    1 set hive.exec.dynamic.partition = true; -- 设置开启动态分区
    2 set hive.exec.dynamic.partition.mode = nonstrict; --设置为非严格模式
    3 set hive.exec.max.dynamic.partitions.pernode=100; -- 默认值 100
    4 set hive.exec.max.dynamic.partitions = 1000; -- 设置动态分区最大个数
    5 set hive.exec.max.created.files = 100000; --设置整个MR Job中最大可以创建多少个HDFS文件,默认值100000

    当我们insert overwrite时,有时会插入动态分区中,当有的分区的数量非常大是,这时会导致reduce阶段的某task任务非常的慢,可以考虑使用分桶加随机数的方式,进行优化,如:

    1 DISTRIBUTE BY 动态分区 + cast(rand()*10 as int) -- 生成reduce阶段
    2 
    3 示例:
    4 insert overwrite table_name partiton(dt,country)
    5 select value,dt,country from temp_table 
    6 DISTRIBUTE BY dt,country,cast(rand() * 10 as int); -- 随机数可以根据数据倾斜程度设置

    但有一个缺点,会导致中间文件的增多,不过中间文件,一般在跑完Job会进行删除(中间文件个数 = 动态分区数 * 随机数)。

    ps: 严格模式下的动态分区插入,必须保证至少有一个静态分区。

  • 相关阅读:
    mysql六:数据备份、pymysql模块
    mysql三:表操作
    mysql四:数据操作
    剑指offer-数组
    剑指offer-回溯
    中缀表达式到后缀表达式的转换
    大数取余算法的证明及实现
    Windows下匿名管道进程通信
    Windows下使用命令行界面调用VS2015编译器编译C++程序方法
    6 个技巧,提升 C++11 的 vector 性能
  • 原文地址:https://www.cnblogs.com/blogwolf/p/15761687.html
Copyright © 2011-2022 走看看