基本概念
map-reduce1.0
例子:
hadoop streaming
- 用语言驱动map-reduce的话,使用的hadoop streaming命令,可以通过python,php,java来驱动;
- 命令参数列表如下:
-input <path> |
输入数据路径 |
-output <path> |
输出数据路径 |
-mapper <cmd|JavaClassName> |
mapper可执行程序或Java类 |
-reducer <cmd|JavaClassName> |
reducer可执行程序或Java类 |
-file <file> Optional |
分发本地文件 |
-cacheFile <file> Optional |
分发HDFS文件 |
-cacheArchive <file> Optional |
分发HDFS压缩文件 |
-numReduceTasks <num> Optional |
reduce任务个数 |
-jobconf | -D NAME=VALUE Optional |
作业配置参数 |
-combiner <JavaClassName>Optional |
Combiner Java类 |
-partitioner <JavaClassName>Optional |
Partitioner Java类 |
-inputformat <JavaClassName> Optional |
InputFormat Java类 |
-outputformat <JavaClassName> Optional |
OutputFormat Java类 |
-inputreader <spec> Optional |
InputReader配置 |
-cmdenv <n>=<v> Optional |
传给mapper和reducer的环境变量 |
-mapdebug <path> Optional |
mapper失败时运行的debug程序 |
-reducedebug <path> Optional |
reducer失败时运行的debug程序 |
-verbose Optional |
详细输出模式 |
map和 reduce task的个数设置问题
参考资料: https://www.cnblogs.com/xiangyangzhu/p/5278328.html
reduce task的个数 决定 map task的个数,reduce task的个数是人为指定的(??存疑,还有一种说法是文件大小和block的关系决定map task的个数)
MapReduce作业中Map Task数目的确定:
1)MapReduce从HDFS中分割读取Split文件,通过Inputformat交给Mapper来处理。Split是MapReduce中最小的计算单元,一个Split文件对应一个Map Task
2)默认情况下HDFS种的一个block,对应一个Split。
3)当执行Wordcount时:
(1)一个输入文件小雨64MB,默认情况下则保存在hdfs上的一个block中,对应一个Split文件,所以将产生一个Map Task。
(2)如果输入一个文件为150MB,默认情况下保存在HDFS上的三个block中,对应三个Split文件,所以将产生三个Map Task。
(3)如果有输入三个文件都小于64MB,默认情况下会保存在三个不同的block中,也将产生三个Map Task。
4)用户可自行指定block与split的关系,HDSF中的一个block,一个Split也可以对应多个block。Split与block的关系都是一对多的关系。
5)总结MapReduce作业中的Map Task数目是由:
(1)输入文件的个数与大小
(2)hadoop设置split与block的关系来决定。
MapReduce作业中Reduce Task数目的指定:
1)JobClient类中submitJobInternal方法中指定:int reduces=jobCopy.getNumReduceTasks();
2)而JobConf类中,public int getNumReduceTasks(){return geInt("mapred.reduce.tasks",1)}
因此,Reduce Task数目是由mapred.reduce.tasks指定,如果不指定则默认为1.
这就很好解释了wordcount程序中的reduce数量为1的问题,这时候map阶段的partition(分区)就为1了。
other说法
增加task的数量,一方面增加了系统的开销,另一方面增加了负载平衡和减小了任务失败的代价;
map task的数量即mapred.map.tasks的参数值,用户不能直接设置这个参数。Input Split的大小,决定了一个Job拥有多少个map。默认input split的大小是64M(与dfs.block.size的默认值相同)。然而,如果输入的数据量巨大,那么默认的64M的block会有几万甚至几十万的Map Task,集群的网络传输会很大,最严重的是给Job Tracker的调度、队列、内存都会带来很大压力。mapred.min.split.size这个配置项决定了每个 Input Split的最小值,用户可以修改这个参数,从而改变map task的数量。
一个恰当的map并行度是大约每个节点10-100个map,且最好每个map的执行时间至少一分钟。
reduce task的数量由mapred.reduce.tasks这个参数设定,默认值是1。
合适的reduce task数量是0.95或者0.75*( nodes * mapred.tasktracker.reduce.tasks.maximum), 其中,mapred.tasktracker.tasks.reduce.maximum的数量一般设置为各节点cpu core数量,即能同时计算的slot数量。对于0.95,当map结束时,所有的reduce能够立即启动;对于1.75,较快的节点结束第一轮reduce后,可以开始第二轮的reduce任务,从而提高负载均衡
性能优化
Reducers数过多的情况:
生成了很多个小文件(最终输出文件由reducer决定,一个reducer输出一个文件),那么如果这些小文件作为下一个Job输入,则会出现小文件过多需要进行合并的问题。而且启动和初始化reducer需要耗费时间和资源。
Reducers数过少:
执行耗时,并且可能出现数据倾斜
Reducer个数的决定:
默认情况下,Hive分配reducer个数由下列参数决定:
参数1:hive.exec.reducers.bytes.per.reducer(默认为1G)
参数2:hive.exec.reducers.max(默认为999)
计算reducer数的公式:
N=min(参数2,总输入数据量/参数1)
即默认一个reduce处理1G数据量。
注意:与mapred.map.tasks参数不同,如果设置了setmapred.reduce.tasks参数的数值,忽略上述计算,reducer个数可以由mapred.reduce.tasks直接指定。