zoukankan      html  css  js  c++  java
  • hive 总结四(优化)

    本文参考:黑泽君相关博客
    本文是我总结日常工作中遇到的坑,结合黑泽君相关博客,选取、补充了部分内容。

    表的优化

    小表join大表、大表join小表

    将key相对分散,并且数据量小的表放在join的左边,这样可以有效减少内存溢出错误发生的几率;
    再进一步,可以使用map join让小的维度表(1000条以下的记录条数)先进内存。在map端完成reduce。

    实际测试发现:新版的hive已经对小表JOIN大表和大表JOIN小表进行了优化。小表放在左边和右边已经没有明显区别

    hive> set hive.auto.convert.join;
    hive.auto.convert.join=true
    

    这一方面的优化个人觉得也就这个样了。


    大表join大表

    有时join超时是因为某些key对应的数据太多,而相同key对应的数据都会发送到相同的reducer上,从而导致内存不够。
    出现上述问题的主要原因有两个:
    数据倾斜
    存在大量异常值,比如空值,坏数据等。

    • 异常值
      下面是两个大表
      nullbigtable:key包含null
      bigtable:关联表
    hive> insert overwrite table resulttable 
        >select n.* 
        >from nullbigtable n 
        >left join 
        >bigtable o 
        >on 
        >n.id=o.id;
    .
    .
    .
    Time taken: 40.346 seconds, Fetched: 1342933 row(s)
    
    hive> insert overwrite table resulttable 
        >select n.* 
        >from 
        >(select * from nullbigtable where id is not null) n 
        >left join 
        >bigtable o 
        >on 
        >n.id=o.id;
    .
    .
    .
    Time taken: 38.346 seconds, Fetched: 1321347 row(s)
    
    上面看起来稍微有点作用,可能是我的测试null设置的比较少。
    
    • 数据倾斜
      如果本身null数据是合法的,或异常数据需要保留,那么就要换种方式处理

    给异常值设置随机数,目的是为了让null或一场数据均匀分布到各个reducer中

    insert overwrite table jointable
    hive>select n.* 
        >from nullbigtable n 
        >full join 
        >bigtable o 
        >on 
        >case when n.id is null then concat('hive', rand()) else n.id end=o.id;
    

    笛卡尔积

    尽量避免笛卡尔积,join的时候不加on条件,或者无效的on条件。
    Hive只能使用1个reducer来完成笛卡尔积(笛卡尔积过大会撑爆内存)。


    MapJoin

    MapJoin工作机制

    通过MapReduce Local Task,将小表读入内存
    生成HashTableFiles上传至Distributed Cache中,这里会对HashTableFiles进行压缩。

    MapReduce Job在Map阶段,每个Mapper从Distributed Cache读取HashTableFiles到内存中
    顺序扫描大表,在Map阶段直接进行Join,将数据传递给下一个MapReduce任务。

    测试
    关闭Mapjoin功能
    hive> set hive.auto.convert.join=false;
    hive> select
        > big.*
        > from 
        > tmp.orclocal small
        > join 
        > tmp.orcTest  big
        > on small.userid=big.userid
        > limit 1;
    Query ID = hdfs_20190716161515_f462647a-14fd-4808-8eb5-4c60dc5663f1
    Total jobs = 1
    Launching Job 1 out of 1
    .
    .
    .
    Stage-Stage-1: Map: 6  Reduce: 4   Cumulative CPU: 222.55 sec   HDFS Read: 251052684 HDFS Write: 1138 SUCCESS
    Total MapReduce CPU Time Spent: 3 minutes 42 seconds 550 msec
    OK
    02012138
    Time taken: 48.529 seconds, Fetched: 1 row(s)
    
    hive> select
        > big.*
        > from 
        > tmp.orcTest  big
        > join 
        > tmp.orclocal small
        > on small.userid=big.userid
        > limit 1;
    Query ID = hdfs_20190716161515_6bac9e09-f579-42c9-84ab-2fb9ef5c8760
    Total jobs = 1
    Launching Job 1 out of 1
    .
    .
    .
    Stage-Stage-1: Map: 6  Reduce: 4   Cumulative CPU: 219.75 sec   HDFS Read: 251048180 HDFS Write: 1138 SUCCESS
    Total MapReduce CPU Time Spent: 3 minutes 39 seconds 750 msec
    OK
    02012138
    Time taken: 47.599 seconds, Fetched: 1 row(s)
    
    
    开启Mapjoin功能
    hive> set hive.auto.convert.join=true;
    hive> select
        > big.*
        > from 
        > tmp.orclocal small
        > join 
        > tmp.orcTest  big
        > on small.userid=big.userid
        > limit 1;
    Query ID = hdfs_20190716161818_85f3ca54-c06b-4d21-8597-7feb86541f54
    Total jobs = 1
    .
    .
    .
    Total MapReduce CPU Time Spent: 39 seconds 800 msec
    OK
    02012138
    Time taken: 20.849 seconds, Fetched: 1 row(s)
    
    hive> 
        > select
        > big.*
        > from 
        > tmp.orcTest  big
        > join 
        > tmp.orclocal small
        > on small.userid=big.userid
        > limit 1;
    Query ID = hdfs_20190716161919_8774a3a0-baa4-4d87-afdf-4701cd2dcba6
    Total jobs = 1
    .
    .
    .
    Stage-Stage-3: Map: 5   Cumulative CPU: 38.98 sec   HDFS Read: 175115579 HDFS Write: 1440 SUCCESS
    Total MapReduce CPU Time Spent: 38 seconds 980 msec
    OK
    02012138
    Time taken: 19.672 seconds, Fetched: 1 row(s)
    

    对比两个结果,开启mapjoin几乎效率提升了一倍


    group by

    默认情况下,Map阶段key相同的数据会全部分发给一个Reduce,
    当一个key数据过大时就会出现数据倾斜导致任务耗时长甚至失败。

    但是并不是所有的聚合操作都需要在Reduce端完成,
    很多聚合操作都可以先在Map端进行部分聚合(预处理),
    最后在Reduce端得出最终结果。(类似于Combine)

    是否在Map端进行聚合,默认为true
    hive> set hive.map.aggr;
    hive.map.aggr=true
    
    设置在Map端进行聚合操作的条目数目
    hive> set hive.groupby.mapaggr.checkinterval;
    hive.groupby.mapaggr.checkinterval=100000
    
    有数据倾斜的时候进行负载均衡(默认是false)
    hive> set hive.groupby.skewindata;
    hive.groupby.skewindata=false
    
    设置对应值 老规矩
    

    当选项设定为shive.map.aggr=true,生成的查询计划会有两个MR Job。

    第一个MRJob中,Map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的group by key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;

    第二个MR Job再根据预处理的数据结果按照group by key分布到Reduce中(这个过程可以保证相同的group by key被分布到同一个Reduce中),最后完成最终的聚合操作。

    案例:
    设置5个reduce个数
    hive> set mapreduce.job.reduces=5;
    
    执行去重id查询(只能用一个Reduce)
    hive> select count(distinct userid ) from tmp.orcTest;
    Query ID = hdfs_20190716163434_54ae4ccc-a95f-4a5c-924f-6fb8e50c8393
    Total jobs = 1
    .
    .
    .
    Stage-Stage-1: Map: 5  Reduce: 1   Cumulative CPU: 233.47 sec   HDFS Read: 13234944 HDFS Write: 8 SUCCESS
    Total MapReduce CPU Time Spent: 3 minutes 53 seconds 470 msec
    OK
    1260351
    Time taken: 76.007 seconds, Fetched: 1 row(s)
    
    采用GROUP BY去重id(可以使用多个Reduce,子查询帮我们做去重工作,把数据分发给了5个Reduce进行去重处理,大数据量的情况下效率高)
    hive> select count(userid) from (select userid from tmp.orcTest group by userid) a;
    Query ID = hdfs_20190716163535_5136be23-60bc-4808-a87f-338975588bb2
    Total jobs = 2
    .
    .
    .
    1260351
    Time taken: 90.242 seconds, Fetched: 1 row(s)
    
    从结果看,貌似翻车了,这个是因为数据量问问题,一共也就才1000W的数据量,效果不明显,如果数据量大几个量级效果就很明显了
    
    count distinct() 踩坑

    如果有个一需求是对 两个字段进行去重 计数,
    相信很多人都会踩到我遇到的坑。
    直接用count (distinct c1,c2)
    但是这样统计是错的

    hive> select count(distinct userid ,errorcode) from tmp.orcTest;
    Query ID = hdfs_20190716164242_b8198330-e1a7-4e23-ab35-2533dc228507
    Total jobs = 1
    .
    .
    .
    OK
    2196359
    Time taken: 124.874 seconds, Fetched: 1 row(s)
    
    hive> 
        > select count(1) from (select userid,errorcode from tmp.orcTest group by userid,errorcode) a;
    Query ID = hdfs_20190716164444_08c11673-6530-4c1a-bdfd-b2768fc7083f
    Total jobs = 2
    OK
    2196359
    Time taken: 137.943 seconds, Fetched: 1 row(s)
    我擦 又翻车了  等我下次找个案例
    

    count ( distinct ) 中 distinct 的字段只能有一个,
    如果要对多个字段去重使用 grouy by


    动态分区

    关系型数据库中,对分区表insert数据时候,数据库自动会根据分区字段的值,将数据插入到相应的分区中,
    Hive中也提供了类似的机制,即动态分区(Dynamic Partition)的概念,
    在使用Hive的动态分区前,需要进行相应的配置。

    查看、设置相关属性

    开启动态分区功能(默认true,开启)
    hive> set hive.exec.dynamic.partition;
    hive.exec.dynamic.partition=true
    
    设置为非严格模式(动态分区的模式,默认strict,表示必须指定至少一个分区为静态分区,  
    nonstrict模式表示允许所有的分区字段都可以使用动态分区。)
    hive> set hive.exec.dynamic.partition.mode;
    hive.exec.dynamic.partition.mode=strict
    
    在所有执行MR的节点上,最大一共可以创建多少个动态分区。
    hive> set hive.exec.max.dynamic.partitions;
    hive.exec.max.dynamic.partitions=1000
    
    在每个执行MR的节点上,最大可以创建多少个动态分区。该值要大于可能分区的个数。  
    比如一周七天有7个分区,如果设置为6就会报错
    hive> set hive.exec.max.dynamic.partitions.pernode;
    hive.exec.max.dynamic.partitions.pernode=100
    
    整个MR Job中,最大可以创建多少个HDFS文件。
    hive> set hive.exec.max.created.files;
    hive.exec.max.created.files=100000
    
    当有空分区生成时,是否抛出异常。一般默认即可
    hive> set hive.error.on.empty.partition;
    hive.error.on.empty.partition=false
    

    数据倾斜

    Map数量

    Q:决定Map数量的因素?

    A:主要有三个因素:

    1. input的文件总个数
    2. input的文件大小
    3. 集群设置的文件块大小
    4. 依稀记得还有一个map数量限制(待确认)

    Q:map数量越多越好?

    A:并不是,如果有很多小文件(远小于一个block的大小),每个小文件会被看作一个块当做一个map任务来完成。这个时候可能启动map任务耗费的代价远高于逻辑处理的代价,造成资源浪费严重。

    Q:是不是保证每个map处理接近一个block块大小的数据量即可?

    A:并不是,正常情况下一个block对应一个map,但是如果对应的数据只有一两个字段,且字段内容很短,那么可能一个block包含的记录条数就会达到几千万,如果处理的逻辑还是相对复杂的,那么一个map很大概率也是吃不消的。

    对于上述问题,我们要根据实际的情况增加或者减少map的数量。

    • 合并小文件(减少map)
      在map执行前合并小文件,减少map数
      CombineHiveInputFormat 系统默认的格式,具有小文件合并的功能
    hive> set hive.input.format;
    hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
    
    HiveInputFormat没有对小文件合并功能。
    
    • 复杂文件(增加Map数)

    Q:什么情况下考虑增加map?

    A: 下面几个情况

    1. 当input的文件都很大
    2. 任务逻辑处理复杂
    3. map执行非常慢的时候
    4. 文件列数量较少
      当上述原因可能导致任务处理的速度变慢时候,可以考虑尝试增加map

    当然有个公式也可以来判断,不过这个公式暂时没研究

    computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M
    

    Reduce 数量
    • 调整reduce个数方法

    方法一: 通过设置参数

    每个Reduce处理的数据量(默认是256MB,args1) 
    hive> set hive.exec.reducers.bytes.per.reducer;
    hive.exec.reducers.bytes.per.reducer=67108864
    
    每个任务最大的reduce数(默认为1009,args2)
    hive> set hive.exec.reducers.max;
    hive.exec.reducers.max=1099
    

    计算公式

    
    N=min(args2,总输入数据量/args1)
    

    方法二: 通过配置文件

    hadoop的mapred-default.xml中修改
    mapreduce.job.reduces=15;
    

    Q: reduce是不是越多越好?

    A: 不是reduce太多也会带来一些列问题:

    1. 启动、初始化reduce会消耗大量资源和时间。
    2. 一个reduce对应一个文件,如果生成的是小文件会对后续任务或结果文件产生不好的影响(hadoop 小文件问题)
      要让reduce处理数据量大小要合适,而不要禁锢自己的思维,一定要多少,合适就好

    并行执行
    打开任务并行执行(默认关闭)
    hive> set hive.exec.parallel;
    hive.exec.parallel=false
    
    同一个sql允许最大并行度(默认为8)
    hive> set hive.exec.parallel.thread.number;
    hive.exec.parallel.thread.number=8
    
    并行执行在系统资源比较空闲的时候才有优势,否则,没资源,也谈不上并行了。
    

    严格模式

    严格模式是为了防止用户执行那些可能意想不到的、不好的影响的查询。
    设置方法一:

    直接设置参数
    hive> set hive.mapred.mode;
    hive.mapred.mode=nonstrict
    

    设置方法二:

    修改配置文件
    hive-default.xml.template
    <property>
        <name>hive.mapred.mode</name>
        <value>strict</value>
        <description>
          执行Hive操作的模式。
          在严格模式下,不允许运行一些有风险的查询。它们包括:
          笛卡儿积
          没有为查询选择分区。
          比较bigint和字符串。
          比较 bigints 和 doubles.
          Order by 没有 limit(只是  Order by  一般的查询 没有limit 也是可以的).
        </description>
    </property>
    
    Order by 没有 limit
    hive> set hive.mapred.mode=strict;
    hive> select * from  tmp.orcTest order by userid desc;
    FAILED: SemanticException 1:36 In strict mode, if ORDER BY is specified, LIMIT must also be specified. Error encountered near token 'userid'
    
    没有为查询选择分区
    hive> set hive.mapred.mode=strict;
    hive> select *  from orcTest_d;
    FAILED: SemanticException [Error 10041]: No partition predicate found for Alias "orcTest_d" Table "orcTest_d"
    

    JVM重用

    场景:难避免小文件的场景或task特别多的场景,这类场景大多数执行时间都很短。

    问题:Hadoop的默认配置通常是使用派生JVM来执行map和Reduce任务的。这时JVM的启动过程可能会造成相当大的开销,尤其是执行的job包含有成百上千task任务的情况。

    解决方法:JVM重用可以使得JVM实例在同一个job中重新使用N次

    配置方法一:

    hive> set mapreduce.job.jvm.numtasks;
    mapreduce.job.jvm.numtasks=1
    

    配置方法二:

    Hadoop的mapred-site.xml
    <property>
      <name>mapreduce.job.jvm.numtasks</name>
      <value>10</value>
      <description>How many tasks to run per jvm. If set to -1, there is
      no limit. 
      </description>
    </property>
    

    一般设置10-20,需要经过根据具体业务确定

    Q:jvm重用没有缺点吗?

    A:开启JVM重用将一直占用使用到的task插槽,以便进行重用,直到任务完成后才能释放。
    如果出现数据倾斜导致某几个task 执行极度缓慢,那么保留的插槽就会一直空闲着却无法被其他的job使用,直到所有的task都结束了才会释放。


    推测执行

    推测执行简单的解释就是,某个task执行的进度明显慢于其他的task,这可能是程序问题,也可能是该节点bug、或者资源问题,于是在其他地方重新启动一个相同的task处理相同数据, 让他们赛跑,谁赢了用谁的,结束慢的。

    配置方法一:

    如果为true,则可以并行执行某些map任务的多个实例。
    hive> set mapreduce.map.speculative;
    mapreduce.map.speculative=false
    
    如果为true,则可以并行执行某些reduce任务的多个实例。
    hive> set mapreduce.reduce.speculative;
    mapreduce.reduce.speculative=true
    

    配置方法二:

    <property>
        <name>hive.mapred.reduce.tasks.speculative.execution</name>
        <value>true</value>
        <description>是否开启reduce的推测执行机制 </description>
    </property>
    
  • 相关阅读:
    SQL Server 索引的自动维护 <第十三篇>
    SQL Server 索引的图形界面操作 <第十二篇>
    python处理时间戳
    今天又犯了Java/Scala里面substring的错误
    新浪系统工程师笔试--shell
    把DEDE的在线文本编辑器换成Kindeditor不显示问题
    C语言 EOF是什么?
    Windows Server 2012 R2超级虚拟化之七 远程桌面服务的增强
    C++数据结构之最小生成树
    python sqlite 查询表的字段名 列名
  • 原文地址:https://www.cnblogs.com/lillcol/p/11198054.html
Copyright © 2011-2022 走看看