zoukankan      html  css  js  c++  java
  • hive优化

     

    1. 概述

    1.1 hive的特征

    1. 可以通过SQL轻松访问数据的工具,从而实现数据仓库任务,如提取/转换/加载(ETL),报告和数据分析;
    2. 它可以使已经存储的数据结构化;
    3. 可以直接访问存储在Apache HDFS或其他数据存储系统(如Apache HBase)中的文件;
    4. Hive除了支持MapReduce计算引擎,还支持Spark和Tez这两种分布式计算引擎;
    5. 它提供类似sql的查询语句HiveQL对数据进行分析处理;
    6. 数据的存储格式有多种,比如数据源是二进制格式,普通文本格式等等;

    1.2 hive的优势

      hive强大之处不要求数据转换成特定的格式,而是利用hadoop本身InputFormat API来从不同的数据源读取数据,同样地使用OutputFormat API将数据写成不同的格式。所以对于不同的数据源,或者写出不同的格式就需要不同的对应的InputFormat和OutputFormat类的实现。以stored as textFile为例,其在底层java API中表现是输入InputFormat格式:TextInputFormat以及输出OutputFormat格式:HiveIgnoreKeyTextOutputFormat。这里InputFormat中定义了如何对数据源文本进行读取划分,以及如何将切片分割成记录存入表中。而OutputFormat定义了如何将这些切片写回到文件里或者直接在控制台输出。

      Hive拥有统一的元数据管理,所以和Spark、Impala等SQL引擎是通用的。通用是指,在拥有了统一的metastore之后,在Hive中创建一张表,在Spark/Impala中是能用的;反之在Spark中创建一张表,在Hive中也是能用的,只需要共用元数据,就可以切换SQL引擎,涉及到了Spark sql和Hive On Spark。

      不仅如此Hive使用SQL语法,提供快速开发的能力,还可以通过用户定义的函数(UDF),用户定义的聚合(UDAF)和用户定义的表函数(UDTF)进行扩展,避免了去写mapreducce,减少开发人员的学习成本。Hive中不仅可以使用逗号和制表符分隔值(CSV/TSV)文本文件,还可以使用Sequence File、RC、ORC、Parquet(知道这几种存储格式的区别)。当然Hive还可以通过用户来自定义自己的存储格式,基本上前面说到几种格式完全够了。Hive旨在最大限度地提高可伸缩性(通过向Hadoop集群动态田间更多机器扩展),性能,可扩展性,容错性以及与其输入格式的松散耦合。

      数据离线处理,比如日志分析,海量数据结构化分析。

    2. Hive函数

    Hive的SQL还可以通过用户定义的函数(UDF),用户定义的聚合(UDAF)和用户定义的表函数(UDTF)进行扩展。

    当Hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数(UDF)。

    UDF、UDAF、UDTF的区别:

    • UDF(User-Defined-Function)一进一出
    • UDAF(User-Defined Aggregation Funcation)聚集函数,多进一出
    • UDTF(User-Defined Table-Generating Functions)一进多出,如lateral view explore()

    3. Hive优化

    3.1 慎用api

     我们知道大数据场景下不害怕数据量大,害怕的是数据倾斜,怎样避免数据倾斜,找到可能产生数据倾斜的函数尤为关键,数据量较大的情况下,慎用count(distinct),count(distinct)容易产生倾斜问题。

    3.2 自定义UDAF函数优化

      sum,count,max,min等UDAF,不怕数据倾斜问题,hadoop在map端汇总合并优化,是数据倾斜不成问题。

    3.3 设置合理的map reduce的task数量

    3.3.1 map阶段优化

    mapred.min.split.size: 指的是数据的最小分割单元大小;min的默认值是1B
    mapred.max.split.size: 指的是数据的最大分割单元大小;max的默认值是256MB
    通过调整max可以起到调整map数的作用,减小max可以增加map数,增大max可以减少map数。
    需要提醒的是,直接调整mapred.map.tasks这个参数是没有效果的。

     举例:

      a) 假设input目录下有1个文件a,大小为780M,那么hadoop会将该文件a分隔成7个块(6个128M的块和1个12M的块),从而产生7个map书;

      b) 假设input目录下有3个文件a,b,c,大小分别为10M,20M,130M,那么hadoop会分隔成4个块(10M,20M,128M,2M),从而产生4个map数;

      注意:如果文件大于块大小(128M),那么会拆分,如果小于块大小,则把该文件当成一个块。

      其实这就涉及到小文件的问题:如果一个任务有很多小文件(远远小于块大小128M),则每个小文件也会当做一个块,用一个map任务来完成。

      而一个map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的map数是受限的。那么,是不是保证每个map处理接近128M的文件块,就高枕无忧了?答案也是不一定。比如有一个127M的文件,正常会用一个map去完成,但这个文件只有一个或者两个小字段,却有几千万的记录,如果map处理的逻辑比较复杂,用一个map任务去做,肯定也比较耗时。 

       我们该如何去解决呢???

      我们需要采取两种方式来解决:即减少map数和增加map数

    • 减少map数量
    复制代码
    假设一个SQL任务:
    Select count(1) from popt_tbaccountcopy_meswhere pt = '2012-07-04';
    该任务的inputdir :  /group/p_sdo_data/p_sdo_data_etl/pt/popt_tbaccountcopy_mes/pt=2012-07-04
    共有194个文件,其中很多事远远小于128M的小文件,总大小9G,正常执行会用194个map任务。
    Map总共消耗的计算资源:SLOTS_MILLIS_MAPS= 623,020
    
    通过以下方法来在map执行前合并小文件,减少map数:
    set mapred.max.split.size=100000000;
    set mapred.min.split.size.per.node=100000000;
    set mapred.min.split.size.per.rack=100000000;
    set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
    再执行上面的语句,用了74个map任务,map消耗的计算资源:SLOTS_MILLIS_MAPS= 333,500
    对于这个简单SQL任务,执行时间上可能差不多,但节省了一半的计算资源。
    大概解释一下,100000000表示100M, 
    set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;这个参数表示执行前进行小文件合并,
    前面三个参数确定合并文件块的大小,大于文件块大小128m的,按照128m来分隔,
    小于128m,大于100m的,按照100m来分隔,把那些小于100m的(包括小文件和分隔大文件剩下的),
    进行合并,最终生成了74个块。
    复制代码
    • 增大map数量
    复制代码
    如何适当的增加map数?
    当input的文件都很大,任务逻辑复杂,map执行非常慢的时候,可以考虑增加Map数,
    来使得每个map处理的数据量减少,从而提高任务的执行效率。
    
     假设有这样一个任务:
        Select data_desc,
                   count(1),
                   count(distinct id),
                   sum(case when ...),
                   sum(case when ...),
                   sum(...)
        from a group by data_desc
    
    如果表a只有一个文件,大小为120M,但包含几千万的记录,如果用1个map去完成这个任务,肯定是比较耗时的,
    这种情况下,我们要考虑将这一个文件合理的拆分成多个,
    这样就可以用多个map任务去完成。
        set mapred.reduce.tasks=10;
          create table a_1 as 
          select * from a 
          distribute by rand(123);
    
    这样会将a表的记录,随机的分散到包含10个文件的a_1表中,再用a_1代替上面sql中的a表,则会用10个map任务去完成。
    每个map任务处理大于12M(几百万记录)的数据,效率肯定会好很多。
    复制代码

      注意:看上去,貌似这两种有些矛盾,一个是要合并小文件,一个是要把大文件拆成小文件,这点正是重点需要关注的地方,使单个map任务处理合适的数据量;

    3.3.2 reduce阶段优化

      Reduce的个数对整个作业的运行性能有很大影响。如果Reduce设置的过大,那么将会产生很多小文件,对NameNode会产生一定的影响,而且整个作业的运行时间未必会减少;如果Reduce设置的过小,那么单个Reduce处理的数据将会加大,很可能会引起OOM异常。

      如果设置了mapred.reduce.tasks/mapreduce.job.reduces参数,那么Hive会直接使用它的值作为Reduce的个数;如果mapred.reduce.tasks/mapreduce.job.reduces的值没有设置(也就是-1),那么Hive会根据输入文件的大小估算出Reduce的个数。根据输入文件估算Reduce的个数可能未必很准确,因为Reduce的输入是Map的输出,而Map的输出可能会比输入要小,所以最准确的数根据Map的输出估算Reduce的个数。

    1. Hive自己如何确定reduce数:

      reduce个数的设定极大影响任务执行效率,不指定reduce个数的情况下,Hive会猜测确定一个reduce个数,基于以下两个设定:

      hive.exec.reducers.bytes.per.reducer(每个reduce任务处理的数据量,默认为1000^3=1G)

      hive.exec.reducers.max(每个任务最大的reduce数,默认为999)

      计算reducer数的公式很简单N=min(参数2,总输入数据量/参数1)

      即,如果reduce的输入(map的输出)总大小不超过1G,那么只会有一个reduce任务;

    如:select pt,count(1) from popt_tbaccountcopy_mes where pt = '2012-07-04' group by pt; 
                /group/p_sdo_data/p_sdo_data_etl/pt/popt_tbaccountcopy_mes/pt=2012-07-04 总大小为9G多,
      因此这句有10个reduce

    2. 调整reduce个数方法一:

      调整hive.exec.reducers.bytes.per.reducer参数的值;

      set hive.exec.reducers.bytes.per.reducer=500000000; (500M)

      select pt, count(1) from popt_tbaccountcopy_mes where pt = '2012-07-04' group by pt;

      这次有20个reduce

    3. 调整reduce个数方法二:

      set mapred.reduce.tasks=15;

      select pt,count(1) from popt_tbaccountcopy_mes where pt = '2012-07-04' group by pt;

      这次有15个reduce

    4. reduce个数并不是越多越好;

      同map一样,启动和初始化reduce也会消耗时间和资源;

      另外,有多少个reduce,就会有个多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;

    5. 什么情况下只有一个reduce;

      很多时候你会发现任务中不管数据量多大,不管你有没有调整reduce个数的参数,任务中一直都只有一个reduce任务;其实只有一个reduce任务的情况,除了数据量小于hive.exec.reducers.bytes.per.reducer参数值的情况外,还有以下原因:

    • 没有group by的汇总,比如把select pt,count(1) from popt_tbaccountcopy_mes where pt = ‘2012-07-04’ group by pt; 写成select count(1) from popt_tbaccountcopy_mes where pt = ‘2012-07-04’; 这点非常常见,希望大家尽量改写。
    • 用了Order by
    • 有笛卡尔积。

      注意:在设置reduce个数的时候也需要考虑这两个原则:使大数据量利用合适的reduce数;是单个reduce任务处理合适的数据量;

    3.4 小文件合并优化

      我们知道文件数目小,容易在文件存储端造成瓶颈,给HDFS带来压力,影响处理效率。对此,可以通过合并Map和Reduce的结果文件来消除这样的影响。

      用于设置合并的参数有:

      • 是否合并Map输出文件:hive.merge.mapfiles=true(默认值为true)
      • 是否合并Reduce端输出文件:hive.merge.mapredfiles=false(默认值为false)
      • 合并文件的大小:hive.merge.size.per.task=256*1000*1000(默认值为256000000)

    3.4.1 Hive优化之小文件问题及其解决方案:

      小文件是如何产生的:

      • 动态分区插入数据,产生大量的小文件,从而导致map数量剧增;
      • reduce数量越多,小文件也越多(reduce的个数和输出文件是对应的);
      • 数据源本身就包含大量的小文件。

      小文件问题的影响:

      • 从Hive的角度看,小文件会开很多map,一个map开一个JVM去执行,所以这些任务的初始化,启动,执行会浪费大量的资源,严重影响性能。
      • 在HDFS中,每个小文件对象约占150byte,如果小文件过多会占用大量内存。这样NameNode内存容量严重制约了集群的扩展。

      小文件问题的解决方案:

        从小文件产生的途径就可以从源头上控制小文件数量,方法如下:

      • 使用Sequencefile作为表存储格式,不要用textfile,在一定程度上可以减少小文件;
      • 减少reduce的数量(可以使用参数进行控制);
      • 少用动态分区,用时记得按distribute by分区;

        对于已有的小文件,我们可以通过以下几种方案解决:

      • 使用hadoop archive命令把小文件进行归档;
      • 重建表,建表时减少reduce数量;
      • 通过参数进行调节,设置map/reduce端的相关参数,如下:   
    复制代码
    //每个Map最大输入大小(这个值决定了合并后文件的数量)  
    set mapred.max.split.size=256000000;    
    //一个节点上split的至少的大小(这个值决定了多个DataNode上的文件是否需要合并)  
    set mapred.min.split.size.per.node=100000000;  
    //一个交换机下split的至少的大小(这个值决定了多个交换机上的文件是否需要合并)    
    set mapred.min.split.size.per.rack=100000000;  
    //执行Map前进行小文件合并  
    set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;   
    
    设置map输出和reduce输出进行合并的相关参数:
    [java] view plain copy
    //设置map端输出进行合并,默认为true  
    set hive.merge.mapfiles = true  
    //设置reduce端输出进行合并,默认为false  
    set hive.merge.mapredfiles = true  
    //设置合并文件的大小  
    set hive.merge.size.per.task = 256*1000*1000  
    //当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件merge。  
    set hive.merge.smallfiles.avgsize=16000000 
    复制代码

    3.5 SQL优化

    3.5.1 列裁剪

      Hive在读数据的时候,可以只读取查询中所需要用到的列,而忽略其他列。例如,若有以下查询:

    SELECT a,b FROM q WHERE e<10;

      在实施此项查询中,Q表有5列(a,b,c,d,e),Hive只读取查询逻辑中真实需要的3列a、b、e, 而忽略列c,d;这样做节省了读取开销,中间表存储开销和数据整合开销。

      裁剪对应的参数项为:hive.optimize.cp=true(默认值为真)

    3.5.2 分区裁剪

      可以在查询的过程中减少不必要的分区。例如,若有以下查询:

    SELECT * FROM (SELECT a1, COUNT(1) FROM T GROUP BY a1) subq WHERE subq.prtn=100; # (多余分区)
    SELECT * FROM T1 JOIN (SELECT * FROM T2) subq ON (T1.a1=subq.a2) WHERE subq.prtn=100;

      查询语句若将"subq.prtn=100"条件放入子查询中更为高效,可以减少读入的分区数目。Hive自动执行这种裁剪优化。

      分区参数为:hive.optimize.pruner=true(默认值为真)

    3.5.3 熟练使用SQL提高查询

      熟练地使用SQL,能写出高效率的查询语句。

      场景:有一张user表,为卖家每天收到表,user_id,ds(日期)为key,属性有主营类目,指标有交易金额,交易笔数。每天要取前10天的总收入,总笔数,和最近一天的主营类目。

      解决方法 1 如下所示:常用方法

    复制代码
    INSERT OVERWRITE TABLE t1
    SELECT user_id, substr(MAX(CONCAT(ds,cat),9) AS main_cat) FROM users
    WHERE ds=20120329 // 20120329 为日期列的值,实际代码中可以用函数表示当天日期GROUP BY user_id;
    
    INSERT OVERWRITE TABLE t2
    SELECT user_id,sum(qty) AS qty, SUM(amt) AS amt FROM users
    WHERE ds BETWEEN 20120301 AND 20120329
    GROUP BY user_id;
    
    SELECT t1.user_id, t1.main_cat, t2.qty, t2.amt FROM t1
    JOIN t2 ON t1.user_id=t2.user_id
    复制代码

      下面给出方法1的思路,实现步骤如下:

      第一步:利用分析函数,取每个user_id最近一天的主营类目,存入临时表t1;

      第二步:汇总10天的总交易金额,交易笔数,存入临时表t2;

      第三步:关联t1、t2,得到最终的结果。

      解决方法 2 如下所示:优化方法

    SELECT user_id, substr(MAX(CONCAT(ds, cat)), 9) AS main_cat, SUM(qty), SUM(amt) FROM users
    WHERE ds BETWEEN 20120301 AND 20120329
    GROUP BY user_id

      在工作中我们总结出:方案2的开销等于方案1的第二步开销,性能提升,由原有的25分钟完成,缩短为10分钟以内完成。节省了两个临时表的读写是一个关键原因,这种方式也适用于Oracle中的数据查找工作。

      SQL具有普适性,很多SQL通用的优化方案在Hadoop分布式计算方式中也可以达到效果。

    3.5.5 不同数据类型关联产生的倾斜问题

      问题:不同数据类型id的关联会产生数据倾斜问题。

      一张表的s8的日志,每个商品一条记录,要和商品表关联。但关联却碰到倾斜的问题。s8的日志中有32位字符串商品id,也有数值商品id,日志中类型是string的,但商品中的数值id是bigint的。猜想问题的原因是把s8的商品id转成数值id做hash来分配Reduce,所以字符串id的s8日志,都到一个Reduce上了,解决的方法验证了这个猜测。

      解决方法:把数据类型转换成字符串类型

    SELECT * FROM s8_log a LEFT OUTER
    JOIN r_auction_auctions b ON a.auction_id=CAST(b.auction_id AS STRING)

      调优结果显示:数据表处理由1小时30分钟经代码调整后可以在20分钟内完成。

    3.5.6 利用Hive对UNION ALL优化的特性

      多表union all会优化成一个job。

      问题:比如推广效果表要和商品表关联,效果表中的auction_id列既有32位字符串商品id,也有数字id,和商品表关联得到商品的信息。

      解决方法:Hive SQL性能会比较好

    SELECT * FROM effect a
    JOIN
    (SELECT auction_id AS auction_id FROM auctions
    UNION ALL
    SELECT auction_string_id AS auction_id FROM auctions) b
    ON a.auction_id=b.auction_id

      比分别过滤数字id,字符串id然后分别和商品表关联性能要好。

      这样写的好处:1个MapReduce作业,商品表只读一次,推广效果表只读取一次。把这个SQL换成Map/Reduce代码的话,Map的时候,把a表的记录打上标签a,商品表记录每读取一条,打上标签b,变成两个<key, value>对,<(b,数字id),value>,<(b,字符串id),value>。

      所以商品表的HDFS读取只会是一次。

    3.5.7 解决Hive对UNION ALL优化的短板

      Hive对union all的优化的特性:对union all优化只局限于非嵌套查询

    • 消灭子查询内的group by

      示例1:子查询内有group by

    SELECT * FROM
    (SELECT * FROM t1 GROUP BY c1,c2,c3 UNION ALL SELECT * FROM t2 GROUP BY c1,c2,c3) t3
    GROUP BY c1,c2,c3

      从业务逻辑上说,子查询内的GROUP BY怎么看都是多余(功能上的多余,除非有COUNT(DISTINCT)),如果不是因为Hive Bug或者性能上的考量(曾经出现如果不执行子查询GROUP BY,数据得不到正确的结果的Hive Bug)。所以这个Hive按经验转换成如下所示:

    SELECT * FROM (SELECT * FROM t1 UNION ALL SELECT * FROM t2) t3 GROUP BY c1,c2,c3

      调优结果:经过测试,并未出现union all的Hive Bug,数据是一致的。MapReduce的作业数由3减少到1。

      t1相当于一个目录,t2相当于一个目录,对Map/Reduce程序来说,t1、t2可以作为Map/Reduce作业的mutli inputs。这可以通过一个Map/Reduce来解决这个问题。Hadoop的计算框架,不怕数据多,就怕作业数多。

      但如果换成是其他计算平台如Oracle,那就不一定了,因为把大输入拆成两个输入,分别排序汇总成merge(假如两个子排序是并行的话),是有可能性能更优的(比如希尔排序比冒泡排序的性能更优)。

    • 消灭子查询内的COUNT(DISTINCT),MAX,MIN
    SELECT * FROM 
    (SELECT * FROM t1
    UNION ALL SELECT c1,c2,c3 count(DISTINCT c4) FROM t2 GROUP BY c1,c2,c3) t3
    GROUP BY c1,c2,c3

      由于子查询里头有COUNT(DISTINCT)操作,直接去GROUP BY将达不到业务目标。这时采用临时表消灭COUNT(DISTINCT)作业不但能解决倾斜问题,还能有效减少jobs。

    INSERT t4 SELECT c1,c2,c3,c4 FROM t2 GROUP BY c1,c2,c3;
    SELECT c1,c2,c3,SUM(income),SUM(uv) FROM
    (SELECT c1,c2,c3,income,0 AS uv FROM t1
    UNION ALL
    SELECT c1,c2,c3,0 AS income, 1 AS uv FROM t2) t3
    GROUP BY c1,c2,c3;

      job数是2,减少一半,而且两次Map/Reduce比COUNT(DISTINCT)效率更高。

      调优结果:千万级别的类目表,member表,与10亿级的商品表关联。原先1963s的任务经过调整,1152s即完成。

    • 消灭子查询内的JOIN  
    SELECT * FROM
    (SELECT * FROM t1 UNION ALL SELECT * FROM t4 UNION ALL SELECT * FROM t2 JOIN t3 ON t2.id=t3.id) x
    GROUP BY c1,c2;

      上面代码运行会有5个jobs。加入先JOIN生存临时表的话t5,然后UNION ALL,会变成2个jobs。

    INSERT OVERWRITE TABLE t5
    SELECT * FROM t2 JOIN t3 ON t2.id=t3.id;
    SELECT * FROM (t1 UNION ALL t4 UNION ALL t5);

      调优结果显示:针对千万级别的广告位表,由原先5个Job共15分钟,分解为2个job,一个8-10分钟,一个3分钟。

    3.5.8 COUNT(DISTINCT)

      计算uv的时候,经常会用到COUNT(DISTINCT),但在数据比较倾斜的时候COUNT(DISTINCT)会比较慢。这时可以尝试用GROUP BY改写代码计算uv。数据量小的时候无所谓,数据量大的情况下,由于COUNT DISTINCT操作需要用一个Reduce Task来完成,这一个Reduce需要处理的数据量太大,就会导致整个Job很难完成,一般COUNT DISTINCT使用先GROUP BY再COUNT的方式替换:

    • 原有代码
    ①
    INSERT OVERWRITE TABLE s_dw_tanx_adzone_uv PARTITION (ds=20120329) 
    SELECT 20120329 AS thedate,adzoneid,COUNT(DISTINCT acookie) AS uv FROM s_ods_log_tanx_pv t WHERE t.ds=20120329 GROUP BY adzoneid;
    
    ②
    select count(distinct id) from bigtable;

      关于COUNT(DISTINCT)的数据倾斜问题不能一概而论,要依情况而定,下面是我测试的一组数据:

      测试数据:169857条

    复制代码
    ①
    #统计每日IP
    CREATE TABLE ip_2014_12_29 AS SELECT COUNT(DISTINCT ip) AS FROM logdfs WHERE logdate='2014_12_29';
    耗时:24.805 seconds
    #统计每日IP(改造)
    CREATE TABLE ip_2014_12_29 AS SELECT COUNT(1) AS IP FROM (SELECT DISTINCT ip from logdfs WHERE logdate='2014_12_29') tmp;
    耗时:46.833 seconds
    
    ②
     select count(id) from (select id from bigtable group by id) a;
    复制代码

    测试结果表明:明显改造后的语句比之前耗时,这时因为改造后的语句有2个SELECT,多了一个job,这样在数据量小的时候,数据不会存在倾斜问题。

    3.5.9 JOIN操作

    3.5.9.1 小表、大表JOIN

      在使用写有Join操作的查询语句时有一条原则:应该将条目少的表/子查询放在Join操作符的左边。原因是在Join操作的Reduce阶段,位于Join操作符左边的表的内容会被加载进内存,将条目少的表放在左边,可以有效减少发生OOM错误的几率;再进一步,可以使用Group让小的维度表(1000条以下的记录条数)先进内存。在map端完成reduce。

      实际测试发现:新版的hive已经对小表JOIN大表和大表JOIN小表进行了优化。小表放在左边和右边已经没有明显区别。

      案例实操:

      (1)关闭mapjoin功能(默认是打开的)

        set hive.auto.convert.join=false;

      (2)执行小表JOIN大表语句

    insert overwrite table jointable
    select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
    from smalltable s
    left join bigtable  b
    on b.id = s.id;

      Time taken: 35.921 seconds

      (3)执行大表JOIN大表语句

    insert overwrite table jointable
    select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
    from bigtable  b
    left join smalltable  s
    on s.id = b.id;

      Time taken: 34.196 seconds;

    3.5.9.2 大表JOIN大表

    1)空Key过滤

      问题:日志中常会出现信息丢失,比如每日约为20亿的全网日志,其中的user_id为主键,在日志收集过程中会丢失,出现主键为null的情况,如果取其中的user_id和bmw_users关联,就会碰到数据倾斜的问题。原因是Hive中,主键为null值的项会被当做相同的Key而分配进同一个计算Map。

      解决方法1:user_id为空的不参与关联,子查询过滤null

    SELECT * FROM log a
    JOIN bmw_users b ON a.user_id IS NOT NULL AND a.user_id=b.user_id
    UNION ALL SELECT * FROM log a WHERE a.user_id IS NULL

      解决方法2 如下所示:函数过滤null

    SELECT * FROM log a LEFT OUTER
    JOIN bmw_users b ON
    CASE WHEN a.user_id IS NULL THEN CONCAT('dp_hive', RAND()) ELSE a.user_id END = b.user_id;

      调优结果:原先由于数据倾斜导致运行时长超过1小时,解决方法1运行每日平均时长25分钟,解决方法2运行的每日平均时长在20分钟左右。优化效果很明显。

      我们在工作中总结出:解决方法2比解决方法1效果更好,不但IO少了,而且作业数也少了。解决方法1中log读取两次,job数为2。解决方法2中job数是1。这个优化适合无效id(比如-99,‘’,null等)产生的倾斜问题。把空值的key变成一个字符串加上随机数,就能把倾斜的数据分到不同的Reduce上,从而解决数据倾斜问题。因为空值不参与关联,即使分到不同的Reduce上,也不会影响最终的结果。附上Hadoop通用关联的实现方法是:关联通过二次排序实现的,关联的列为partition key,关联的列和表的tag组成排序的group key,根据partition key分配Reduce。同一Reduce内根据group key排序。

    3.5.9.3 MAP JOIN操作

      如果不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换成Common Join,即:在Reduce阶段完成join。容易发生数据倾斜。可以用MapJoin把小表全部加载到内存在map端进行join,避免reducer处理。

    • 开启MapJoin参数设置:

        1) 设置自动选择MapJoin

          set hive.auto.convert.join = true;默认为true

        2) 大表小表的阀值设置(默认25M一下认为是小表):

          set hive.mapjoin.smalltable.filesize=25000000;

    • MapJoin工作机制

      

      上图是Hive MapJoin的原理图,从图中可以看出MapJoin分为两个阶段:

      (1)通过MapReduce Local Task,将小表读入内存,生成内存HashTableFiles上传至Distributed Cache中,这里会对HashTableFiles进行压缩。

      (2)MapReduce Job在Map阶段,每个Mapper从Distributed Cache读取HashTableFiles到内存中,顺序扫描大表,在Map阶段直接进行Join,将数据传递给下一个MapReduce任务。也就是在map端进行join避免了shuffle。

      Join操作在Map阶段完成,不再需要Reduce,有多少个Map Task,就有多少个结果文件。

      实例:

      (1)开启MapJoin功能

        set hive.auto.convert.join = true; 默认为true

      (2)执行小表JOIN大表语句

    insert overwrite table jointable
    select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
    from smalltable s
    join bigtable  b
    on s.id = b.id;

      Time taken: 24.594 seconds

      (3)执行大表JOIN小表语句

    insert overwrite table jointable
    select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
    from bigtable  b
    join smalltable  s
    on s.id = b.id;

      Time taken: 24.315 seconds

    3.5.9.3 GROUP BY操作

      默认情况下,Map阶段同一Key数据分发给一个reduce,当一个key数据过大时就倾斜了。进行GROUP BY操作时需要注意以下几点:

    • Map端部分聚合

      事实上并不是所有的聚合操作都需要在reduce部分进行,很多聚合操作都可以先在Map端进行部分聚合,然后reduce端得出最终结果。

      (1)开启Map端聚合参数设置

        set hive.map.aggr=true

      (2)在Map端进行聚合操作的条目数目

        set hive.grouby.mapaggr.checkinterval=100000

      (3)有数据倾斜的时候进行负载均衡(默认是false)

        set hive.groupby.skewindata = true

    • 有数据倾斜时进行负载均衡

      此处需要设定hive.groupby.skewindata,当选项设定为true时,生成的查询计划有两个MapReduce任务。在第一个MapReduce中,map的输出结果集合会随机分布到reduce中,每个reduce做部分聚合操作,并输出结果。这样处理的结果是,相同的Group By Key有可能分发到不同的reduce中,从而达到负载均衡的目的;第二个MapReduce任务再根据预处理的数据结果按照Group By Key分布到reduce中(这个过程可以保证相同的Group By Key分布到同一个reduce中),最后完成最终的聚合操作。

    3.5.10 优化in/exists语句

      虽然经过测验,hive1.2.1也支持in/exists操作,但还是推荐使用hive的一个高效替代方案:left semi join

      比如说:    

          select a.id, a.name from a where a.id in (select b.id from b);
          select a.id, a.name from a where exists (select id from b where a.id = b.id);

      应该转换成:

          select a.id, a.name from a left semi join b on a.id = b.id;

    3.5.11 排序选择

    • cluster by: 对同一字段分桶并排序,不能和sort by连用;
    • distribute by + sort by: 分桶,保证同一字段值只存在一个结果文件当中,结合sort by 保证每个reduceTask结果有序;
    • sort by: 单机排序,单个reduce结果有序
    • order by:全局排序,缺陷是只能使用一个reduce

    3.6 存储格式

      可以使用列裁剪,分区裁剪,orc,parquet等这些列式存储格式,因为列式存储的表,每一列的数据在物理上是存储在一起的,Hive查询时会只遍历需要列数据,大大减少处理的数据量。

      Hive支持ORCfile,这是一种新的表格存储格式,通过诸如谓词下推,压缩等技术来提高执行速度提升。对于每个HIVE表使用ORCfile应该是一件容易的事情,并且对于获得HIVE查询的快速响应时间非常有益。

      作为一个例子,考虑两个大表A和B(作为文本存储,其中一些列未在此处指定,即行式存储的缺点)以及一个简单的查询,如:

      SELECT A.customerID,A.name,A.age,A.address join B.role,B.department,B.salary ON A.customerID=B.customerID;

      此查询可能需要很长时间才能执行,因为表A和B都以TEXT形式存储,进行全表扫描。

      将这些表格转换为ORCFile格式通常会显着减少查询时间;

      ORC支持压缩存储(使用ZLIB或如上所示使用SNAPPY),但也支持未压缩的存储。

    复制代码
    CREATE TABLE A_ORC (
      customerID int,name string,age int, address string
    ) STORED AS ORC tblproperties ("orc.compress" = "SNAPPY");
    
    INSERT INTO TABLE A_ORC SELECT * FROM A;
    
    CREATE TABLE B_ORC (
           customerID int, role string, salary float, department string
    ) STORED AS ORC tblproperties ("orc.compress" = "SNAPPY");
    
    INSERT INTO TABLE B_ORC SELECT * FROM B;
    
    SELECT A_ORC.customerID, A_ORC.name, A_ORC.age, A_ORC.address join B_ORC.role,B_ORC.department, B_ORC.salary
    ON A_ORC.customerID=B_ORC.customerID;
    复制代码

    3.7 压缩格式

      大数据场景下存储格式压缩格式尤为关键,可以提升计算速度,减少存储空间,降低网络io,磁盘io,所以要选择合适的压缩格式和存储格式,那么首先就了解这些东西。参考该博客

    3.7.1 压缩的原因

      Hive最终是转为MapReduce程序来执行的,而MapReduce的性能瓶颈在于网络IO和磁盘IO,要解决性能瓶颈,最主要的是减少数据量,对数据进行压缩是个好的方式。压缩虽然是减少了数据量,但是压缩过程要消耗CPU的,但是在Hadoop中,往往性能瓶颈不在于CPU,CPU压力并不大,所以压缩充分利用了比较空闲的CPU。

    3.7.2 常用压缩方法对比

        

      各个压缩方式所对应的Class类:

      

    3.7.3 压缩方式的选择

      压缩比率,压缩解压缩速度,是否支持Split

    3.7.4 压缩使用

      Job输出文件按照block以Gzip的方式进行压缩:

    set mapreduce.output.fileoutputformat.compress=true // 默认值是 false
    set mapreduce.output.fileoutputformat.compress.type=BLOCK // 默认值是 Record
    set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec // 默认值是 org.apache.hadoop.io.compress.DefaultCodec

      Map输出结果也以Gzip进行压缩:

    set mapred.map.output.compress=true
    set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.GzipCodec // 默认值是 org.apache.hadoop.io.compress.DefaultCodec 

      对Hive输出结果和中间都进行压缩:

    set hive.exec.compress.output=true // 默认值是 false,不压缩
    set hive.exec.compress.intermediate=true // 默认值是 false,为 true 时 MR 设置的压缩才启用

    3.8 引擎的选择

      Hive可以使用Apache Tez执行引擎而不是古老的Map-Reduce引擎。没有在环境中没有默认打开,在Hive查询开头将以下内容设置为‘true’来使用Tez:“设置hive.execution.engine = tez; ”,通过上述设置,你执行的每个HIVE查询都将利用Tez。目前Hive On Spark还处于试验阶段,慎用。

    3.9 使用向量化查询

      向量化查询执行通过一次性批量执行1024行而不是每次单行执行,从而提供扫描、聚合、筛选器和连接等操作的性能。在Hive 0.13中引入,此功能显着提高了查询执行时间,并可通过两个参数设置轻松启用: 

      设置hive.vectorized.execution.enabled = true;
      设置hive.vectorized.execution.reduce.enabled = true;

    3.10 设置cost based query optimization

      Hive自0.14.0开始,加入了一项“Cost based Optimizer”来对HQL执行计划进行优化,这个功能通过“hive.cbo.enable”来开启。在Hive 1.1.0之后,这个feature是默认开启的,它可以自动优化HQL中多个JOIN的顺序,并选择合适的JOIN算法。

      Hive在提供最终执行前,优化每个查询的执行逻辑和物理执行计划。这些优化工作是交给底层来完成的。根据查询成本执行进一步的优化,从而产生潜在的不同决策:如何排序连接,执行哪种类型的连接,并行度等等。要使用基于成本的优化(也称为CBO),请在查询开始设置以下参数:

      设置hive.cbo.enable = true;
      设置hive.compute.query.using.stats = true;
      设置hive.stats.fetch.column.stats = true;
      设置hive.stats.fetch.partition.stats = true;

    3.11 模式选择

    • 本地模式

      对于大多数情况,Hive可以通过本地模式在单台机器上处理所有任务。对于小数据,执行时间可以明显被缩短。通过set hive.exec.mode.local.auto = true(默认为false)设置为本地模式,本地模式涉及到三个参数:

       set hive.exec.mode.local.auto=true; 是打开hive自动判断是否启动本地模式的开关,但是只是打开这个参数不能保证启动本地模式,要当map任务数不超过hive.exec.mode.local.auto.input.files.max的个数并且map输入文件大小不超过hive.exec.mode.local.auto.inputbytes.max所指定的大小时,才能启动本地模式。

      如下:用户可以通过设置hive.exec.mode.local.auto的值为true,来让Hive在适当的时候自动启动这个优化。

    set hive.exec.mode.local.auto=true;  //开启本地mr
    //设置local mr的最大输入数据量,当输入数据量小于这个值时采用local  mr的方式,默认为134217728,即128M
    set hive.exec.mode.local.auto.inputbytes.max=50000000;
    //设置local mr的最大输入文件个数,当输入文件个数小于这个值时采用local mr的方式,默认为4
    set hive.exec.mode.local.auto.input.files.max=10;
    • 并行模式

      Hive会将一个查询转化成一个或多个阶段。这样的阶段可以是MapReduce阶段、抽样阶段、合并阶段、limit阶段。默认情况下,Hive一次只会执行一个阶段,由于job包含多个阶段,而这些阶段并非完全相互依赖,即:这些阶段可以并行执行,可以缩短整个job的执行时间。设置参数,set hive.exec.parallel=true,或者通过配置文件来完成:

      hive> set hive.exec.parallel;

    • 严格模式

      Hive提供一个严格模式,可以防止用户执行那些可能产生意想不到的影响查询,通过设置Hive.mapred.modestrict来完成。

      set Hive.mapred.modestrict;

    3.12 JVM重用

      Hadoop通常是使用派生JVM来执行map和reduce任务的。这时JVM的启动过程可能会造成相当大的开销,尤其是执行的job包含成百上千的task任务的情况。JVM重用可以使得JVM示例在同一个job中时候,通过参数mapred.job.reuse.jvm.num.tasks来设置。

    3.13 推测执行

      Hadoop推测执行可以触发执行一些重复的任务,尽管因对重复的数据进行计算而导致消耗更多的计算资源,不过这个功能的目标是通过加快获取单个task的结果以侦测执行慢的TaskTracker加入到没名单的方式来提高整体的任务执行效率。Hadoop的推测执行功能由2个配置控制着,通过mapred-site.xml中配置 

      mapred.map.tasks.speculative.execution=true
      mapred.reduce.tasks.speculative.execution=true

  • 相关阅读:
    CodeForces-1100C NN and the Optical Illusion 简单数学
    HDU-3038 How Many Answers Are Wrong 并查集
    POJ-1321 棋盘问题 DFS
    POJ-1651 Multiplication Puzzle 区间DP
    HDU-2037 今年暑假不AC 贪心
    排序算法的总结
    UVa-679 Dropping Balls 二叉树
    Coursera机器学习——Recommender System测验
    页面置换算法及例题
    这篇最新MySQL面试题请查收
  • 原文地址:https://www.cnblogs.com/whywy/p/12733017.html
Copyright © 2011-2022 走看看