1.1 中心思想
这里以Hive On MapReduce 为例,Hive On Spark等思路也是一致的.
1.2 一些常见的优化思路
1.2.1 IO
1.2.2 数据倾斜 慎用count(distinct)
count(distinct)可以考虑换GroupBy子查询 注意null值带来的数据倾斜
1.2.3 表关联
大表放后 MapReduce从后往前构建数据,先过滤大表把数据量降下来,可以在Reduce端的Hash-Join减少数据量,提示效率
同列关联 如可能,用同一列关联 同列关联,无论关联多少表都是一个Map搞定,如果不是同列,就会新开一个MapReduce
1.2.4 配置优化
2.1 不跑MapReduce的情况
explain select * from dept_et limit 1;
STAGE DEPENDENCIES: Stage-0 is a root stage STAGE PLANS: Stage: Stage-0 Fetch Operator limit: 1 Processor Tree: TableScan alias: dept_et Statistics: Num rows: 1 Data size: 322 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: id (type: int), name (type: string), city (type: string) outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 1 Data size: 322 Basic stats: COMPLETE Column stats: NONE Limit Number of rows: 1 Statistics: Num rows: 1 Data size: 322 Basic stats: COMPLETE Column stats: NONE ListSink
2.2 join
explain select * from dept_et et join dept_mg mg on et.id= mg.id
<!--构筑MR作业流 4=>3=>0(结束) --> STAGE DEPENDENCIES: Stage-4 is a root stage Stage-3 depends on stages: Stage-4 Stage-0 depends on stages: Stage-3 STAGE PLANS: <!--第一步MR 表扫描mg(dept_mg mg) 自带一个基础过滤谓词(id is not null) 这里可以看出 join的基准表是后表 Map Reduce Local 本地化的MapReduce 因为测试表的数据量非常小,所以Hive最终选择将数据拉取到本地直接操作,而不是去执行一个完整的分布式MapReduce--> Stage: Stage-4 Map Reduce Local Work Alias -> Map Local Tables: mg Fetch Operator limit: -1 Alias -> Map Local Operator Tree: mg TableScan alias: mg Statistics: Num rows: 1 Data size: 79 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: id is not null (type: boolean) Statistics: Num rows: 1 Data size: 79 Basic stats: COMPLETE Column stats: NONE HashTable Sink Operator keys: 0 id (type: int) 1 id (type: int) <!--第二步的MapReduce任务 表扫描 执行一个 Map Join 输出_col0, _col1, _col2, _col6, _col7, _col8(也就是语句中的*,全部共6个字段) 输出结果为 File Output 临时文件(compressed: false 不压缩)--> Stage: Stage-3 Map Reduce Map Operator Tree: TableScan alias: et Statistics: Num rows: 1 Data size: 322 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: id is not null (type: boolean) Statistics: Num rows: 1 Data size: 322 Basic stats: COMPLETE Column stats: NONE Map Join Operator condition map: Inner Join 0 to 1 keys: 0 id (type: int) 1 id (type: int) outputColumnNames: _col0, _col1, _col2, _col6, _col7, _col8 Statistics: Num rows: 1 Data size: 354 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: _col0 (type: int), _col1 (type: string), _col2 (type: string), _col6 (type: int), _col7 (type: string), _col8 (type: string) outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5 Statistics: Num rows: 1 Data size: 354 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false Statistics: Num rows: 1 Data size: 354 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Local Work: Map Reduce Local Work Stage: Stage-0 Fetch Operator limit: -1 Processor Tree: ListSink
2.3 group by
explain select city,sum(id) from dept_et group by city;
STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 depends on stages: Stage-1 STAGE PLANS: <!--stage定义,一个stage对应一个MapReduce--> Stage: Stage-1 <!--Map过程--> Map Reduce Map Operator Tree: TableScan //表扫描 alias: dept_et Statistics: Num rows: 3 Data size: 322 Basic stats: COMPLETE Column stats: NONE //表dept_et的统计数据预估 Select Operator //查询列裁剪,表示只需要 city (type: string), id (type: int) 两列 expressions: city (type: string), id (type: int) outputColumnNames: city, id Statistics: Num rows: 3 Data size: 322 Basic stats: COMPLETE Column stats: NONE <!--map操作定义 是以city (type: string)取hash作为key,执行函数sum(id),结果为_col0, _col1(hash(city),sum(id))--> Group By Operator aggregations: sum(id) //分组执行函数=>sum(id) keys: city (type: string) mode: hash outputColumnNames: _col0, _col1 Statistics: Num rows: 3 Data size: 322 Basic stats: COMPLETE Column stats: NONE <!--map端的输出--> Reduce Output Operator key expressions: _col0 (type: string) //Map端输出的Key是_col0(hash(city)) sort order: + Map-reduce partition columns: _col0 (type: string) Statistics: Num rows: 3 Data size: 322 Basic stats: COMPLETE Column stats: NONE value expressions: _col1 (type: bigint) //Map端输出的Value是_col1(sum(id)) <!--Reduce过程 合并多个Map的输出 以_col0(也就是map输出的hash(city))为key 执行sum(VALUE._col0(也就是map输出的sum(id))),执行结果也是_col0, _col1(hash(city),sum(sum(id)))--> Reduce Operator Tree: Group By Operator aggregations: sum(VALUE._col0 keys: KEY._col0 (type: string) mode: mergepartial //partial(多个map的输出)merge(合并) outputColumnNames: _col0, _col1 Statistics: Num rows: 1 Data size: 107 Basic stats: COMPLETE Column stats: NONE <!--Reduce端的输出 输出为一个临时文件,不压缩--> File Output Operator compressed: false Statistics: Num rows: 1 Data size: 107 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Stage: Stage-0 Fetch Operator limit: -1 Processor Tree: ListSink
2.4 distinct
2.4.1 distinct一个
select city,count(distinct(name)) from dept_et group by city;
STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 depends on stages: Stage-1 STAGE PLANS: Stage: Stage-1 Map Reduce <!--Map端定义 输入: 表扫描 dept_et 原值查询city,name 执行过程: 以group列(city),distinct列(name)做为Key,执行表达式count(DISTINCT name) 输出:_col0, _col1, _col2 (city,name,count(DISTINCT name))--> Map Operator Tree: TableScan alias: dept_et Statistics: Num rows: 1 Data size: 322 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: city (type: string), name (type: string) //没有计算函数,直接是查询原值 outputColumnNames: city, name Statistics: Num rows: 1 Data size: 322 Basic stats: COMPLETE Column stats: NONE Group By Operator aggregations: count(DISTINCT name) keys: city (type: string), name (type: string) mode: hash outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 1 Data size: 322 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: _col0 (type: string), _col1 (type: string) sort order: ++ Map-reduce partition columns: _col0 (type: string) Statistics: Num rows: 1 Data size: 322 Basic stats: COMPLETE Column stats: NONE <!--Reduce端定义 接收Map端的输出,再以_col0作为Key,再做一次聚合(对city.name做一次去重计数) 结果输出到临时文件--> Reduce Operator Tree: Group By Operator aggregations: count(DISTINCT KEY._col1:0._col0) keys: KEY._col0 (type: string) mode: mergepartial outputColumnNames: _col0, _col1 Statistics: Num rows: 1 Data size: 322 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false Statistics: Num rows: 1 Data size: 322 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Stage: Stage-0 Fetch Operator limit: -1 Processor Tree: ListSink
2.4.2 多个distinct字段
select dealid, count(distinct uid), count(distinct date) from order group by dealid;