Hive是一种底层封装了Hadoop的数据仓库处理工具,使用类SQL的HiveQL语言实现数据查询分析。Hive的数据存储在Hadoop兼容的文件系统(例如HDFS、Amazon S3)中,HiveQL查询会转化为MapReduce程序在Hadoop集群上执行。
在优化时,把Hive SQL当作MapReduce程序来读,会有意想不到的惊喜。理解Hadoop的核心能力(partition,sort等),是Hive优化的根本。
Hadoop处理数据的过程,有几个显著的特征:
1.不怕数据多,就怕数据倾斜。
2.对jobs数比较多的作业运行效率相对比较低,比如即使有几百行的表,如果多次关联多次汇总,产生十几个jobs,没半小时是跑不完的。map reduce作业初始化的时间是比较长的。
3.对sum,count,max来说,不存在数据倾斜问题。Hadoop在map端的合并汇总优化,使数据倾斜不成问题。
4.对count(distinct ),效率较低,数据量一多,准出问题,如果是多count(distinct )效率更低。因为count(distinct)是按group by字段分组,按distinct字段排序,一般这种分布方式是很倾斜的。
Hive优化可以从这几个方面着手:
1.好的模型设计事半功倍。
2.解决数据倾斜问题。
3.减少job数。
4.设置合理的map reduce的task数,能有效提升性能。(比如,10w+级别的计算,用160个reduce,那是相当的浪费,1个足够)。
5.自己动手写sql解决数据倾斜问题是个不错的选择。set hive.groupby.skewindata=true;这是通用的算法优化,但算法优化总是漠视业务,习惯性提供通用的解决方法。 Etl开发人员更了解业务,更了解数据,所以通过业务逻辑解决倾斜的方法往往更精确,更有效。
6.对count(distinct)采取漠视的方法,尤其数据大的时候很容易产生倾斜问题,不抱侥幸心理。自己动手,丰衣足食。
7.对小文件进行合并,是行至有效的提高调度效率的方法,假如我们的作业设置合理的文件数,对云梯的整体调度效率也会产生积极的影响。
8.优化时把握整体,单个作业最优不如整体最优。
优化性能
1.map阶段优化
主要是确定合适的map数
num_map_tasks = max[${mapred.min.split.size},min(${dfs.block.size},${mapred.max.split.size})]
mapred.min.split.size: 指的是数据的最小分割单元大小;min的默认值是1B
mapred.max.split.size: 指的是数据的最大分割单元大小;max的默认值是256MB
dfs.block.size: 指的是HDFS设置的数据块大小。个已经指定好的值,而且这个参数默认情况下hive是识别不到的
2.reduce阶段优化
选择合适的reduce task数量,与map阶段优化不同的是,reduce优化时,可以直接设置mapreduce.job.reduces(等价于mapred.reduce.tasks)来指定最终的reduce个数,hive就不会用estimation函数来计算reduce个数。当这个参数没有指定时(默认为-1),可通过设置如下参数来指定reduce个数
num_reduce_tasks = min[${hive.exec.reducers.max},(${input.size}/${hive.exec.reducers.bytes.per.reducer})]
hive.exec.reducers.max:最大的reduce个数(默认是1009)
input.size:输入数据的大小(总字节数)
hive.exec.reducers.bytes.per.reducer:每个reduce能处理的字节数
3.数据倾斜优化
数据倾斜由于数据分布不均匀,导致大量地集中到一点,造成数据热点,也就是某个reduce节点处理的数据量是其他reduce的n倍,导致明显的木桶效应,任务进度长时间卡在99%左右。
数据倾斜的原因:
(1)key分布不均匀
(2)业务数据本身的特性
(3)建表时考虑不周
(4)某些sql语句本身就有数据倾斜
3.1 参数调优
(1) hive.map.aggr=true
默认就是true,Map端部分聚合,相当于Combiner
(2) hive.groupby.skewindata=true
第一个MR job中,Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group By Key有可能被分到不同的Reduce中,从而达到**负载均衡**的目的;第二个MR job再根据预处理的数据结果按照Group By Key分布到Reduce中(这个过程可以保证相同的Group By Key 被分配到相同的reduce中),最终完成聚合的操作。
3.2 sql语句调优
(1) 大小表join
使用mapjoin让小的维度表(1000条以下的记录条数)先进内存,将小表放到join左边,减少OOM的机率。
select /*+mapjoin(a)*/ count(1) from tb_a a left outer join tb_b b on a.uid=b.uid;
(2) 小表不大不小,怎么用mapjoin解决数据倾斜
使用map join解决小表(记录数少)关联大表的数据倾斜问题,这个方法使用的频率非常高,但如果小表很大,大到map join会出现bug或异常,这时就需要特别的处理.
以下例子:
select * from log a left outer join users b on a.user_id = b.user_id;
users 表有 600w+ 的记录,把 users 分发到所有的 map 上也是个不小的开销,而且 map join 不支持这么大的小表。如果用普通的 join,又会碰到数据倾斜的问题。
解决方法:
select /*+mapjoin(x)*/* from log a left outer join ( select /*+mapjoin(c)*/d.* from ( select distinct user_id from log ) c join users d on c.user_id = d.user_id ) x on a.user_id = b.user_id;
假如,log里user_id有上百万个,这就又回到原来map join问题。所幸,每日的会员uv不会太多,有交易的会员不会太多,有点击的会员不会太多,有佣金的会员不会太多等等。所以这个方法能解决很多场景下的数据倾斜问题。
(3) 空值产生的数据倾斜
场景:如日志中,常会有信息丢失的问题,比如全网日志中的user_id,如果取其中的user_id和bmw_users关联,会碰到数据倾斜的问题。
解决方法1: user_id为空的不参与关联
Select * From log a Join bmw_users b On a.user_id is not null And a.user_id = b.user_id Union all Select * from log a where a.user_id is null;
解决方法2 :赋与空值分新的key值
Select * from log a left outer join bmw_users b on case when a.user_id is null then concat(‘dp_hive’,rand() ) else a.user_id end = b.user_id;
结论:方法2比方法效率更好,不但io少了,而且作业数也少了。方法1 log读取两次,jobs是2。方法2 job数是1 。这个优化适合无效id(比如-99,’’,null等)产生的倾斜问题。把空值的key变成一个字符串加上随机数,就能把倾斜的数据分到不同的reduce上 ,解决数据倾斜问题。附上hadoop通用关联的实现方法(关联通过二次排序实现的,关联的列为parition key,关联的列c1和表的tag组成排序的group key,根据parition key分配reduce。同一reduce内根据group key排序)。
(4) 不同数据类型关联产生的数据倾斜
场景:一张表s8的日志,每个商品一条记录,要和商品表关联。但关联却碰到倾斜的问题。s8的日志中有字符串商品id,也有数字的商品id,类型是string的,但商品中的数字id是bigint的。猜测问题的原因是把s8的商品id转成数字id做hash来分配reduce,所以字符串id的s8日志,都到一个reduce上了,解决的方法验证了这个猜测。
解决方法:把数字类型转换成字符串类型
Select * from s8_log a Left outer join r_auction_auctions b On a.auction_id = cast(b.auction_id as string);