本文参考:黑泽君相关博客
本文是我总结日常工作中遇到的坑,结合黑泽君相关博客,选取、补充了部分内容。
表的优化
小表join大表、大表join小表
将key相对分散,并且数据量小的表放在join的左边,这样可以有效减少内存溢出错误发生的几率;
再进一步,可以使用map join让小的维度表(1000条以下的记录条数)先进内存。在map端完成reduce。
实际测试发现:新版的hive已经对小表JOIN大表和大表JOIN小表进行了优化。小表放在左边和右边已经没有明显区别
hive> set hive.auto.convert.join;
hive.auto.convert.join=true
这一方面的优化个人觉得也就这个样了。
大表join大表
有时join超时是因为某些key对应的数据太多,而相同key对应的数据都会发送到相同的reducer上,从而导致内存不够。
出现上述问题的主要原因有两个:
数据倾斜
存在大量异常值,比如空值,坏数据等。
- 异常值
下面是两个大表
nullbigtable:key包含null
bigtable:关联表
hive> insert overwrite table resulttable
>select n.*
>from nullbigtable n
>left join
>bigtable o
>on
>n.id=o.id;
.
.
.
Time taken: 40.346 seconds, Fetched: 1342933 row(s)
hive> insert overwrite table resulttable
>select n.*
>from
>(select * from nullbigtable where id is not null) n
>left join
>bigtable o
>on
>n.id=o.id;
.
.
.
Time taken: 38.346 seconds, Fetched: 1321347 row(s)
上面看起来稍微有点作用,可能是我的测试null设置的比较少。
- 数据倾斜
如果本身null数据是合法的,或异常数据需要保留,那么就要换种方式处理
给异常值设置随机数,目的是为了让null或一场数据均匀分布到各个reducer中
insert overwrite table jointable
hive>select n.*
>from nullbigtable n
>full join
>bigtable o
>on
>case when n.id is null then concat('hive', rand()) else n.id end=o.id;
笛卡尔积
尽量避免笛卡尔积,join的时候不加on条件,或者无效的on条件。
Hive只能使用1个reducer来完成笛卡尔积(笛卡尔积过大会撑爆内存)。
MapJoin
MapJoin工作机制
通过MapReduce Local Task,将小表读入内存
生成HashTableFiles上传至Distributed Cache中,这里会对HashTableFiles进行压缩。
MapReduce Job在Map阶段,每个Mapper从Distributed Cache读取HashTableFiles到内存中
顺序扫描大表,在Map阶段直接进行Join,将数据传递给下一个MapReduce任务。
测试
关闭Mapjoin功能
hive> set hive.auto.convert.join=false;
hive> select
> big.*
> from
> tmp.orclocal small
> join
> tmp.orcTest big
> on small.userid=big.userid
> limit 1;
Query ID = hdfs_20190716161515_f462647a-14fd-4808-8eb5-4c60dc5663f1
Total jobs = 1
Launching Job 1 out of 1
.
.
.
Stage-Stage-1: Map: 6 Reduce: 4 Cumulative CPU: 222.55 sec HDFS Read: 251052684 HDFS Write: 1138 SUCCESS
Total MapReduce CPU Time Spent: 3 minutes 42 seconds 550 msec
OK
02012138
Time taken: 48.529 seconds, Fetched: 1 row(s)
hive> select
> big.*
> from
> tmp.orcTest big
> join
> tmp.orclocal small
> on small.userid=big.userid
> limit 1;
Query ID = hdfs_20190716161515_6bac9e09-f579-42c9-84ab-2fb9ef5c8760
Total jobs = 1
Launching Job 1 out of 1
.
.
.
Stage-Stage-1: Map: 6 Reduce: 4 Cumulative CPU: 219.75 sec HDFS Read: 251048180 HDFS Write: 1138 SUCCESS
Total MapReduce CPU Time Spent: 3 minutes 39 seconds 750 msec
OK
02012138
Time taken: 47.599 seconds, Fetched: 1 row(s)
开启Mapjoin功能
hive> set hive.auto.convert.join=true;
hive> select
> big.*
> from
> tmp.orclocal small
> join
> tmp.orcTest big
> on small.userid=big.userid
> limit 1;
Query ID = hdfs_20190716161818_85f3ca54-c06b-4d21-8597-7feb86541f54
Total jobs = 1
.
.
.
Total MapReduce CPU Time Spent: 39 seconds 800 msec
OK
02012138
Time taken: 20.849 seconds, Fetched: 1 row(s)
hive>
> select
> big.*
> from
> tmp.orcTest big
> join
> tmp.orclocal small
> on small.userid=big.userid
> limit 1;
Query ID = hdfs_20190716161919_8774a3a0-baa4-4d87-afdf-4701cd2dcba6
Total jobs = 1
.
.
.
Stage-Stage-3: Map: 5 Cumulative CPU: 38.98 sec HDFS Read: 175115579 HDFS Write: 1440 SUCCESS
Total MapReduce CPU Time Spent: 38 seconds 980 msec
OK
02012138
Time taken: 19.672 seconds, Fetched: 1 row(s)
对比两个结果,开启mapjoin几乎效率提升了一倍
group by
默认情况下,Map阶段key相同的数据会全部分发给一个Reduce,
当一个key数据过大时就会出现数据倾斜导致任务耗时长甚至失败。
但是并不是所有的聚合操作都需要在Reduce端完成,
很多聚合操作都可以先在Map端进行部分聚合(预处理),
最后在Reduce端得出最终结果。(类似于Combine)
是否在Map端进行聚合,默认为true
hive> set hive.map.aggr;
hive.map.aggr=true
设置在Map端进行聚合操作的条目数目
hive> set hive.groupby.mapaggr.checkinterval;
hive.groupby.mapaggr.checkinterval=100000
有数据倾斜的时候进行负载均衡(默认是false)
hive> set hive.groupby.skewindata;
hive.groupby.skewindata=false
设置对应值 老规矩
当选项设定为
shive.map.aggr=true
,生成的查询计划会有两个MR Job。
第一个MRJob中,Map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的group by key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;
第二个MR Job再根据预处理的数据结果按照group by key分布到Reduce中(这个过程可以保证相同的group by key被分布到同一个Reduce中),最后完成最终的聚合操作。
案例:
设置5个reduce个数
hive> set mapreduce.job.reduces=5;
执行去重id查询(只能用一个Reduce)
hive> select count(distinct userid ) from tmp.orcTest;
Query ID = hdfs_20190716163434_54ae4ccc-a95f-4a5c-924f-6fb8e50c8393
Total jobs = 1
.
.
.
Stage-Stage-1: Map: 5 Reduce: 1 Cumulative CPU: 233.47 sec HDFS Read: 13234944 HDFS Write: 8 SUCCESS
Total MapReduce CPU Time Spent: 3 minutes 53 seconds 470 msec
OK
1260351
Time taken: 76.007 seconds, Fetched: 1 row(s)
采用GROUP BY去重id(可以使用多个Reduce,子查询帮我们做去重工作,把数据分发给了5个Reduce进行去重处理,大数据量的情况下效率高)
hive> select count(userid) from (select userid from tmp.orcTest group by userid) a;
Query ID = hdfs_20190716163535_5136be23-60bc-4808-a87f-338975588bb2
Total jobs = 2
.
.
.
1260351
Time taken: 90.242 seconds, Fetched: 1 row(s)
从结果看,貌似翻车了,这个是因为数据量问问题,一共也就才1000W的数据量,效果不明显,如果数据量大几个量级效果就很明显了
count distinct() 踩坑
如果有个一需求是对 两个字段进行去重 计数,
相信很多人都会踩到我遇到的坑。
直接用count (distinct c1,c2)
但是这样统计是错的
hive> select count(distinct userid ,errorcode) from tmp.orcTest;
Query ID = hdfs_20190716164242_b8198330-e1a7-4e23-ab35-2533dc228507
Total jobs = 1
.
.
.
OK
2196359
Time taken: 124.874 seconds, Fetched: 1 row(s)
hive>
> select count(1) from (select userid,errorcode from tmp.orcTest group by userid,errorcode) a;
Query ID = hdfs_20190716164444_08c11673-6530-4c1a-bdfd-b2768fc7083f
Total jobs = 2
OK
2196359
Time taken: 137.943 seconds, Fetched: 1 row(s)
我擦 又翻车了 等我下次找个案例
count ( distinct )
中 distinct 的字段只能有一个,
如果要对多个字段去重使用 grouy by
动态分区
关系型数据库中,对分区表insert数据时候,数据库自动会根据分区字段的值,将数据插入到相应的分区中,
Hive中也提供了类似的机制,即动态分区(Dynamic Partition)的概念,
在使用Hive的动态分区前,需要进行相应的配置。
查看、设置相关属性
开启动态分区功能(默认true,开启)
hive> set hive.exec.dynamic.partition;
hive.exec.dynamic.partition=true
设置为非严格模式(动态分区的模式,默认strict,表示必须指定至少一个分区为静态分区,
nonstrict模式表示允许所有的分区字段都可以使用动态分区。)
hive> set hive.exec.dynamic.partition.mode;
hive.exec.dynamic.partition.mode=strict
在所有执行MR的节点上,最大一共可以创建多少个动态分区。
hive> set hive.exec.max.dynamic.partitions;
hive.exec.max.dynamic.partitions=1000
在每个执行MR的节点上,最大可以创建多少个动态分区。该值要大于可能分区的个数。
比如一周七天有7个分区,如果设置为6就会报错
hive> set hive.exec.max.dynamic.partitions.pernode;
hive.exec.max.dynamic.partitions.pernode=100
整个MR Job中,最大可以创建多少个HDFS文件。
hive> set hive.exec.max.created.files;
hive.exec.max.created.files=100000
当有空分区生成时,是否抛出异常。一般默认即可
hive> set hive.error.on.empty.partition;
hive.error.on.empty.partition=false
数据倾斜
Map数量
Q:决定Map数量的因素?
A:主要有三个因素:
- input的文件总个数
- input的文件大小
- 集群设置的文件块大小
- 依稀记得还有一个map数量限制(待确认)
Q:map数量越多越好?
A:并不是,如果有很多小文件(远小于一个block的大小),每个小文件会被看作一个块当做一个map任务来完成。这个时候可能启动map任务耗费的代价远高于逻辑处理的代价,造成资源浪费严重。
Q:是不是保证每个map处理接近一个block块大小的数据量即可?
A:并不是,正常情况下一个block对应一个map,但是如果对应的数据只有一两个字段,且字段内容很短,那么可能一个block包含的记录条数就会达到几千万,如果处理的逻辑还是相对复杂的,那么一个map很大概率也是吃不消的。
对于上述问题,我们要根据实际的情况增加或者减少map的数量。
- 合并小文件(减少map)
在map执行前合并小文件,减少map数
CombineHiveInputFormat
系统默认的格式,具有小文件合并的功能
hive> set hive.input.format;
hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
HiveInputFormat没有对小文件合并功能。
- 复杂文件(增加Map数)
Q:什么情况下考虑增加map?
A: 下面几个情况
- 当input的文件都很大
- 任务逻辑处理复杂
- map执行非常慢的时候
- 文件列数量较少
当上述原因可能导致任务处理的速度变慢时候,可以考虑尝试增加map
当然有个公式也可以来判断,不过这个公式暂时没研究
computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M
Reduce 数量
- 调整reduce个数方法
方法一: 通过设置参数
每个Reduce处理的数据量(默认是256MB,args1)
hive> set hive.exec.reducers.bytes.per.reducer;
hive.exec.reducers.bytes.per.reducer=67108864
每个任务最大的reduce数(默认为1009,args2)
hive> set hive.exec.reducers.max;
hive.exec.reducers.max=1099
计算公式
N=min(args2,总输入数据量/args1)
方法二: 通过配置文件
hadoop的mapred-default.xml中修改
mapreduce.job.reduces=15;
Q: reduce是不是越多越好?
A: 不是reduce太多也会带来一些列问题:
- 启动、初始化reduce会消耗大量资源和时间。
- 一个reduce对应一个文件,如果生成的是小文件会对后续任务或结果文件产生不好的影响(hadoop 小文件问题)
要让reduce处理数据量大小要合适,而不要禁锢自己的思维,一定要多少,合适就好
并行执行
打开任务并行执行(默认关闭)
hive> set hive.exec.parallel;
hive.exec.parallel=false
同一个sql允许最大并行度(默认为8)
hive> set hive.exec.parallel.thread.number;
hive.exec.parallel.thread.number=8
并行执行在系统资源比较空闲的时候才有优势,否则,没资源,也谈不上并行了。
严格模式
严格模式是为了防止用户执行那些可能意想不到的、不好的影响的查询。
设置方法一:
直接设置参数
hive> set hive.mapred.mode;
hive.mapred.mode=nonstrict
设置方法二:
修改配置文件
hive-default.xml.template
<property>
<name>hive.mapred.mode</name>
<value>strict</value>
<description>
执行Hive操作的模式。
在严格模式下,不允许运行一些有风险的查询。它们包括:
笛卡儿积
没有为查询选择分区。
比较bigint和字符串。
比较 bigints 和 doubles.
Order by 没有 limit(只是 Order by 一般的查询 没有limit 也是可以的).
</description>
</property>
Order by 没有 limit
hive> set hive.mapred.mode=strict;
hive> select * from tmp.orcTest order by userid desc;
FAILED: SemanticException 1:36 In strict mode, if ORDER BY is specified, LIMIT must also be specified. Error encountered near token 'userid'
没有为查询选择分区
hive> set hive.mapred.mode=strict;
hive> select * from orcTest_d;
FAILED: SemanticException [Error 10041]: No partition predicate found for Alias "orcTest_d" Table "orcTest_d"
JVM重用
场景:难避免小文件的场景或task特别多的场景,这类场景大多数执行时间都很短。
问题:Hadoop的默认配置通常是使用派生JVM来执行map和Reduce任务的。这时JVM的启动过程可能会造成相当大的开销,尤其是执行的job包含有成百上千task任务的情况。
解决方法:JVM重用可以使得JVM实例在同一个job中重新使用N次
配置方法一:
hive> set mapreduce.job.jvm.numtasks;
mapreduce.job.jvm.numtasks=1
配置方法二:
Hadoop的mapred-site.xml
<property>
<name>mapreduce.job.jvm.numtasks</name>
<value>10</value>
<description>How many tasks to run per jvm. If set to -1, there is
no limit.
</description>
</property>
一般设置10-20,需要经过根据具体业务确定
Q:jvm重用没有缺点吗?
A:开启JVM重用将一直占用使用到的task插槽,以便进行重用,直到任务完成后才能释放。
如果出现数据倾斜导致某几个task 执行极度缓慢,那么保留的插槽就会一直空闲着却无法被其他的job使用,直到所有的task都结束了才会释放。
推测执行
推测执行简单的解释就是,某个task执行的进度明显慢于其他的task,这可能是程序问题,也可能是该节点bug、或者资源问题,于是在其他地方重新启动一个相同的task处理相同数据, 让他们赛跑,谁赢了用谁的,结束慢的。
配置方法一:
如果为true,则可以并行执行某些map任务的多个实例。
hive> set mapreduce.map.speculative;
mapreduce.map.speculative=false
如果为true,则可以并行执行某些reduce任务的多个实例。
hive> set mapreduce.reduce.speculative;
mapreduce.reduce.speculative=true
配置方法二:
<property>
<name>hive.mapred.reduce.tasks.speculative.execution</name>
<value>true</value>
<description>是否开启reduce的推测执行机制 </description>
</property>