一、Fetch抓取
Fetch
抓取是指,Hive
中对某些情况的查询可以不必使用MapReduce
计算。例如:SELECT * FROM EMP
;在这种情况下,Hive
可以简单地读取EMP
对应的存储目录下的文件,然后输出查询结果到控制台。
在hive-default.xml.template
文件中hive.fetch.task.conversion
默认是more
,老版本Hive
默认是minimal
,该属性修改为more
以后,在全局查找、字段查找、limit
查找等都不走MapReduce
。
案例实操:
hive (default)> set hive.fetch.task.conversion=more;
hive (default)> select * from emp;
二、本地模式
大多数的Hadoop Job
是需要Hadoop
提供的完整的可扩展性来处理大数据集的。不过,有时Hive
的输入数据量是非常小的。在这种情况下,为查询触发执行任务消耗的时间可能会比实际job
的执行时间要多的多。对于大多数这种情况,Hive
可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短。
用户可以通过设置hive.exec.mode.local.auto
的值为true
,来让Hive
在适当的时候自动启动这个优化,默认是false
。
案例实操:
hive (default)> set hive.exec.mode.local.auto=true;
hive (default)> select * from emp cluster by deptno;
Time taken: 1.328 seconds, Fetched: 14 row(s)
三、表的优化
3.1 小表、大表Join
将key
相对分散,并且数据量小的表放在join
的左边,这样可以有效减少内存溢出错误发生的几率;再进一步,可以使用map join
让小的维度表(1000条以下的记录条数)先进内存。在map
端完成reduce
。
实际测试发现:新版的Hive
已经对小表JOIN
大表和大表JOIN
小表进行了优化。小表放在左边和右边已经没有明显区别。
3.2 大表Join大表
①空KEY过滤
有时join
超时是因为某些key
对应的数据太多,而相同key
对应的数据都会发送到相同的Reducer
上,从而导致内存不够。此时我们应该仔细分析这些异常的key
,很多情况下,这些key
对应的数据是异常数据,我们需要在SQL
语句中进行过滤。
测试不过滤空id:
hive (default)> insert overwrite table jointable
select n.* from nullidtable n
left join ori o
on n.id = o.id;
Time taken: 42.038 seconds
Time taken: 37.284 seconds
测试过滤空id:
hive (default)> insert overwrite table jointable
select n.* from (select * from nullidtable where id is not null ) n
left join ori o
on n.id = o.id;
Time taken: 31.725 seconds
Time taken: 28.876 seconds
结论:过滤空id
后MapReduce
的速度明显变快
②空KEY转换
有时虽然某个key
为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在join
的结果中,此时我们可以表a
中key
为空的字段赋一个随机的值,使得数据随机均匀地分不到不同的Reducer
上
案例实操:不随机分布空null值:
设置5个Reduce
个数:set mapreduce.job.reduces = 5;
JOIN
两张表:insert overwrite table jointable select n.* from nullidtable n left join ori b on n.id = b.id;
如图所示,出现了数据倾斜,某些Reducer
的资源消耗远大于其他Reducer
案例实操:随机分布空null值:
insert overwrite table jointable
select n.* from nullidtable n
full join ori o
on
case when n.id is null
then concat('hive', rand())
else n.id
end
= o.id;
如图所示,消除了数据倾斜,负载均衡Reducer
的资源消耗。
3.3 MapJoin(小表join大表)
如果不指定MapJoin
或者不符合MapJoin
的条件,那么Hive
解析器会将Join
操作转换成Common Join
,即在Reduce
阶段完成join
。容易发生数据倾斜。可以用MapJoin
把小表全部加载到内存在map
端进行join
,避免Reducer
处理。
案例实操:
开启MapJoin
功能(默认为true):set hive.auto.convert.join = true;
执行小表JOIN
大表语句
insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from smalltable s
join bigtable b
on s.id = b.id;
Time taken: 24.594 seconds
执行大表JOIN
小表语句
insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from bigtable b
join smalltable s
on s.id = b.id;
Time taken: 24.315 seconds
3.4 Group By
默认情况下,Map
阶段同一Key
数据分发给一个Reduce
,当一个key
数据过大时就倾斜了。默认情况下,Map
阶段同一Key
数据分发给一个Reduce
,当一个key
数据过大时就倾斜了。
开启Map端聚合参数设置:
是否在Map
端进行聚合,默认为True
:set hive.map.aggr = true
在Map
端进行聚合操作的条目数目:set hive.groupby.mapaggr.checkinterval = 100000
有数据倾斜的时候进行负载均衡(默认是false
):set hive.groupby.skewindata = true
当选项设定为true
,生成的查询计划会有两个MR Job
。第一个MR Job
中,Map
的输出结果会随机分布到Reduce
中,每个Reduce
做部分聚合操作,并输出结果,这样处理的结果是相同的Group By Key
有可能被分发到不同的Reduce
中,从而达到负载均衡的目的;第二个MR Job
再根据预处理的数据结果按照Group By Key
分布到Reduce
中(这个过程可以保证相同的Group By Key
被分布到同一个Reduce
中),最后完成最终的聚合操作。
hive (default)> select deptno from emp group by deptno;
Stage-Stage-1: Map: 1 Reduce: 5 Cumulative CPU: 23.68 sec HDFS Read: 19987 HDFS
Write: 9 SUCCESS
Total MapReduce CPU Time Spent: 23 seconds 680 msec
OK
deptno
10
20
30
优化以后
hive (default)> set hive.groupby.skewindata = true;
hive (default)> select deptno from emp group by deptno;
Stage-Stage-1: Map: 1 Reduce: 5 Cumulative CPU: 28.53 sec HDFS Read: 18209 HDFS
Write: 534 SUCCESS
Stage-Stage-2: Map: 1 Reduce: 5 Cumulative CPU: 38.32 sec HDFS Read: 15014 HDFS
Write: 9 SUCCESS
Total MapReduce CPU Time Spent: 1 minutes 6 seconds 850 msec
OK
deptno
10
20
30
3.5 Count(Distinct) 去重统计
数据量小的时候无所谓,数据量大的情况下,由于COUNT DISTINCT
的全聚合操作,即使设定了ReduceTask
个数为多个,Hive
也只会启动一个Reducer
。这就造成一个Reduce
处理的数据量太大,导致整个Job
很难完成,一般COUNT DISTINCT
使用先GROUP BY
再COUNT
的方式替换:
执行去重id
查询:
hive (default)> select count(distinct id) from bigtable;
Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 7.12 sec HDFS Read: 120741990
HDFS Write: 7 SUCCESS
Total MapReduce CPU Time Spent: 7 seconds 120 msec
OK
c0
100001
Time taken: 23.607 seconds, Fetched: 1 row(s)
采用group by
去重id
hive (default)> select count(id) from (select id from bigtable group by id) a;
Stage-Stage-1: Map: 1 Reduce: 5 Cumulative CPU: 17.53 sec HDFS Read: 120752703
HDFS Write: 580 SUCCESS
Stage-Stage-2: Map: 1 Reduce: 1 Cumulative CPU: 4.29 sec HDFS Read: 9409
HDFS Write: 7 SUCCESS
Total MapReduce CPU Time Spent: 21 seconds 820 msec
OK
_c0
100001
Time taken: 50.795 seconds, Fetched: 1 row(s)
虽然会多用一个Job
来完成,MapReduce
时间也变长,但在数据量大的情况下,这个绝对是值得的。
3.6 笛卡尔积
尽量避免笛卡尔积,join
的时候不加on
条件,或者无效的on
条件,Hive
只能使用1个Reducer
来完成笛卡尔积。
3.7 行列过滤
列处理:在SELECT
中,只拿需要的列,如果有,尽量使用分区过滤,少用SELECT *
。
行处理:在分区剪裁中,当使用外关联时,如果将副表的过滤条件写在Where
后面,那么就会先全表关联,之后再过滤,比如:
案例实操:
1.测试先关联两张表,再用where
条件过滤
hive (default)> select o.id from bigtable b
join ori o on o.id = b.id
where o.id <= 10;
Time taken: 34.406 seconds, Fetched: 100 row(s)
2.通过子查询后,再关联表
hive (default)> select b.id from bigtable b
join (select id from ori where id <= 10 ) o on b.id = o.id;
Time taken: 30.058 seconds, Fetched: 100 row(s)
3.8 动态分区调整
关系型数据库中,对分区表Insert
数据时候,数据库自动会根据分区字段的值,将数据插入到相应的分区中,Hive
中也提供了类似的机制,即动态分区(Dynamic Partition
),只不过,使用Hive
的动态分区,需要进行相应的配置。
开启动态分区参数设置:
- 开启动态分区功能(默认
true
,开启):hive.exec.dynamic.partition=true
- 设置为非严格模式(动态分区的模式,默认
strict
,表示必须指定至少一个分区为静态分区,nonstrict
模式表示允许所有的分区字段都可以使用动态分区。)
hive.exec.dynamic.partition.mode=nonstrict
- 在所有执行
MR
的节点上,最大一共可以创建多少个动态分区。默认1000
hive.exec.max.dynamic.partitions=1000
- 在每个执行
MR
的节点上,最大可以创建多少个动态分区。该参数需要根据实际的数据来设定。比如:源数据中包含了一年的数据,即day
字段有365个值,那么该参数就需要设置成大于365,如果使用默认值100,则会报错。
hive.exec.max.dynamic.partitions.pernode=100
- 整个
MR Job
中,最大可以创建多少个HDFS
文件。默认100000
hive.exec.max.created.files=100000
- 当有空分区生成时,是否抛出异常。一般不需要设置。默认
false
hive.error.on.empty.partition=false
案例实操:
需求:将dept
表中的数据按照地区(loc
字段),插入到目标表dept_partition
的相应分区中。
创建目标分区表
hive (default)> create table dept_partition(id int, name string) partitioned
by (location int) row format delimited fields terminated by ' ';
设置动态分区
hive (default)> set hive.exec.dynamic.partition.mode = nonstrict;
hive (default)> insert into table dept_partition partition(location)
select deptno, dname, loc from dept;
查看目标分区表的分区情况:hive (default)> show partitions dept_partition;
3.9 分桶分区
见https://hucheng.blog.csdn.net/article/details/106684320
四、合理设置Map及Reduce数
① 通常情况下,Job
会通过input
的目录产生一个或者多个map
任务。
主要的决定因素有:input
的文件总个数,input
的文件大小,集群设置的文件块大小。
② 是不是map
数越多越好?
答案是否定的。如果一个任务有很多小文件(远远小于块大小128m
),则每个小文件也会被当做一个块,用一个map
任务来完成,而一个map
任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的map
数是受限的。
③ 是不是保证每个map
处理接近128m
的文件块,就高枕无忧了?
答案也是不一定。比如有一个127m
的文件,正常会用一个map
去完成,但这个文件只有一个或者两个小字段,却有几千万的记录,如果map
处理的逻辑比较复杂,用一个map
任务去做,肯定也比较耗时。
针对上面的问题2和3,我们需要采取两种方式来解决:即减少map
数和增加map
数;
4.1 复杂文件增加Map数
当input
的文件都很大,任务逻辑复杂,map
执行非常慢的时候,可以考虑增加map
数,来使得每个map
处理的数据量减少,从而提高任务的执行效率。
增加map
的方法为:根据computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M
公式,调整maxSize
最大值。让maxSize
最大值低于blocksize
就可以增加map
的个数。
案例实操:
1.执行查询
hive (default)> select count(*) from emp;
Hadoop job information for Stage-1: number of mappers: 1; number of
reducers: 1
2.设置最大切片值为100个字节
hive (default)> set mapreduce.input.fileinputformat.split.maxsize=100;
hive (default)> select count(*) from emp;
Hadoop job information for Stage-1: number of mappers: 6; number of
reducers: 1
4.2 小文件进行合并
①在map
执行前合并小文件,减少map
数:CombineHiveInputFormat
具有对小文件进行合并的功能(系统默认的格式)。HiveInputFormat
没有对小文件合并功能。
set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
②在Map-Reduce
的任务结束时合并小文件的设置:
-
在
map-only
任务结束时合并小文件,默认true
SET hive.merge.mapfiles = true;
-
在
map-reduce
任务结束时合并小文件,默认false
SET hive.merge.mapredfiles = true;
-
合并文件的大小,默认256
M
SET hive.merge.size.per.task = 268435456;
-
当输出文件的平均大小小于该值时,启动一个独立的
map-reduce
任务进行文件merge
SET hive.merge.smallfiles.avgsize = 16777216;
4.3 合理设置Reduce数
①调整reduce
个数方法1
- 每个
Reduce
处理的数据量默认是256MB
hive.exec.reducers.bytes.per.reducer=256000000
- 每个任务最大的
Reduce
数,默认为1009
hive.exec.reducers.max=1009
- 计算
Reducer
数的公式:N=min(参数2,总输入数据量/参数1)
②调整reduce
个数方法2
在hadoop
的mapred-default.xml
文件中修改。
设置每个job
的Reduce
个数:set mapreduce.job.reduces = 15;
③reduce
个数并不是越多越好
- 过多的启动和初始化
reduce
也会消耗时间和资源; - 另外,有多少个
reduce
,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;
在设置reduce
个数的时候也需要考虑这两个原则:处理大数据量利用合适的reduce
数;使单个reduce
任务处理数据量大小要合适;
五、其他方式
5.1 并行执行
Hive
会将一个查询转化成一个或者多个阶段。这样的阶段可以是MapReduce
阶段、抽样阶段、合并阶段、limit
阶段。或者Hive
执行过程中可能需要的其他阶段。默认情况下,Hive
一次只会执行一个阶段。不过,某个特定的job
可能包含众多的阶段,而这些阶段可能并非完全互相依赖的,也就是说有些阶段是可以并行执行的,这样可能使得整个job
的执行时间缩短。不过,如果有更多的阶段可以并行执行,那么job
可能就越快完成。
通过设置参数hive.exec.parallel
值为true
,就可以开启并发执行。不过,在共享集群中,需要注意下,如果job
中并行阶段增多,那么集群利用率就会增加。
set hive.exec.parallel=true; //打开任务并行执行
set hive.exec.parallel.thread.number=16; //同一个sql允许最大并行度,默认为8。
当然,得是在系统资源比较空闲的时候才有优势,否则,没资源,并行也起不来
5.2 严格模式
Hive提供了一个严格模式,可以防止用户执行那些可能意想不到的不好的影响的查询。通过设置属性hive.mapred.mode
值为默认是非严格模式nonstrict
。开启严格模式需要修改hive.mapred.mode
值为strict
,开启严格模式可以禁止3种类型的查询。
<property>
<name>hive.mapred.mode</name>
<value>strict</value>
</property>
- 对于分区表,除非
where
语句中含有分区字段过滤条件来限制范围,否则不允许执行。换句话说,就是用户不允许扫描所有分区。进行这个限制的原因是,通常分区表都拥有非常大的数据集,而且数据增加迅速。没有进行分区限制的查询可能会消耗令人不可接受的巨大资源来处理这个表。 - 对于使用了
order by
语句的查询,要求必须使用limit语句。因为order by
为了执行排序过程会将所有的结果数据分发到同一个Reducer
中进行处理,强制要求用户增加这个LIMIT
语句可以防止Reducer
额外执行很长一段时间。 - 限制笛卡尔积的查询。对关系型数据库非常了解的用户可能期望在执行
JOIN
查询的时候不使用ON
语句而是使用where
语句,这样关系数据库的执行优化器就可以高效地将WHERE
语句转化成那个ON
语句。不幸的是,Hive
并不会执行这种优化,因此,如果表足够大,那么这个查询就会出现不可控的情况。
5.3 JVM重用
JVM
重用是Hadoop
调优参数的内容,其对Hive
的性能具有非常大的影响,特别是对于很难避免小文件的场景或task
特别多的场景,这类场景大多数执行时间都很短。
Hadoop
的默认配置通常是使用派生JVM
来执行map
和Reduce
任务的。这时JVM
的启动过程可能会造成相当大的开销,尤其是执行的job包含有成百上千task任务的情况。JVM
重用可以使得JVM
实例在同一个job
中重新使用N
次。N
的值可以在Hadoop
的mapred-site.xml文
件中进行配置。通常在10-20之间,具体多少需要根据具体业务场景测试得出。
<property>
<name>mapreduce.job.jvm.numtasks</name>
<value>10</value>
<description>How many tasks to run per jvm. If set to -1, there is
no limit.
</description>
</property>
这个功能的缺点是,开启JVM
重用将一直占用使用到的task
插槽,以便进行重用,直到任务完成后才能释放。如果某个不平衡的job
中有某几个reduce task
执行的时间要比其他Reduce task
消耗的时间多的多的话,那么保留的插槽就会一直空闲着却无法被其他的job
使用,直到所有的task
都结束了才会释放。
5.4 推测执行
在分布式集群环境下,因为程序Bug
(包括Hadoop
本身的bug
),负载不均衡或者资源分布不均等原因,会造成同一个作业的多个任务之间运行速度不一致,有些任务的运行速度可能明显慢于其他任务(比如一个作业的某个任务进度只有50%,而其他所有任务已经运行完毕),则这些任务会拖慢作业的整体执行进度。为了避免这种情况发生,Hadoop
采用了推测执行(Speculative Execution
)机制,它根据一定的法则推测出“拖后腿”的任务,并为这样的任务启动一个备份任务,让该任务与原始任务同时处理同一份数据,并最终选用最先成功运行完成任务的计算结果作为最终结果。
设置开启推测执行参数:Hadoop
的mapred-site.xml
文件中进行配置,默认是true
<property>
<name>mapreduce.map.speculative</name>
<value>true</value>
</property>
<property>
<name>mapreduce.reduce.speculative</name>
<value>true</value>
</property>
Hive
本身也提供了配置项来控制reduce-side
的推测执行:默认是true
<property>
<name>hive.mapred.reduce.tasks.speculative.execution</name>
<value>true</value>
</property>
关于调优这些推测执行变量,还很难给一个具体的建议。如果用户对于运行时的偏差非常敏感的话,那么可以将这些功能关闭掉。如果用户因为输入数据量很大而需要执行长时间的map
或者Reduce task
的话,那么启动推测执行造成的浪费是非常巨大大。
5.5 压缩
见https://hucheng.blog.csdn.net/article/details/106769939
5.6 执行计划(Explain)
基本语法:EXPLAIN [EXTENDED | DEPENDENCY | AUTHORIZATION] query
案例实操:
- 查看下面这条语句的执行计划
hive (default)> explain select * from emp;
hive (default)> explain select deptno, avg(sal) avg_sal from emp
group by deptno;
- 查看详细执行计划
hive (default)> explain extended select * from emp;
hive (default)> explain extended select deptno, avg(sal) avg_sal from emp
group by deptno;