1.MapReduce概述
MapReduce是Hadoop的计算引擎,是Hadoop的三大组件之一.
但随着技术的发展,MapReduce臃肿繁杂,正在被其它的计算引擎所慢慢取代,这里就了解一下MapReduce的执行过程就行了
2.MapReduce的执行过程
MapReduce总体上分为4个执行阶段:InputFormat,MapTask.ReduceTask,OutputFormat.然后中间夹杂shuffle过程.
2.1 InputFormat:
默认是FileInputFormat的TextInputFormat类,获取输入分片.使用默认的RecordReader:LineRecordReader将输入分片的每一行按 分成Key-Value.key偏移量,Value是行内容.调用一次Map方法
一个输入分片对应一个Map方法.
(对于行分断到两个分片的情况,每一个分片都必须读到行断为止,
如果本片读完都没有找到,会读下一片的开头直到读到行断符.然后将这行归属本片处理.而之后的分片都会跳过第一个行断直接从第二行读取)
2.2 MapTask:
首先就是数据写入内存的一个缓冲区,随着缓冲区数据 达到阀值时(默认是0.8(mapreduce.map.sort.spill.percent,也就是80M) ,就会启动一个溢写线程
溢写线程负责将缓冲区数据形成一个pill文件写入磁盘.当然,会经历一些提前处理过程.这个过程就是Shuffle(Map-Shuffle)
整个Shuffle过程如下:
首先要做的就是对缓存区的数据进行分区再排序.(默认HashPartitioner,就是取Key的Hash值,也可以自定义分区,就是继承Partitioner类). 分区内再按Key排序
形成排序之后的结果<Partition,Key,Value>,此时检查如果设定了Combiner,这个时候就会用Combiner对这个结果进行处理.
最终结果,作为一个pill文件(Map的pill,是不同文件块的部分),写到磁盘.
因为每次溢写都会形成一个pill文件,这些pill文件会被归并排序为一个索引文件和一个数据文件,这个过程叫做Merge
如果设定了文件压缩(mapreduce.map.output.compress,默认为false),pill-merge的过程中,数据就会被压缩.
(文件压缩使用mapreduce.map.output.compress.codec(默认org.apache.hadoop.io.compress.DefaultCodec)库压缩)
当所有的pill文件都Merge完毕,会形成最终结果 key,[values]的东西(values其实是来自不同pill文件的结果),这就是Map的输出数据
此时就会删除所有的pill文件,并通知AppMaster Map完毕,让AppMatser准备联系下一步(让Reduce来拉取输出)
2.3 ReduceTask
Reduce-Shuffle
接下来进行Reduce-Shuffle 阶段。(不一定在Map全部结束后,事实上MapTask完成任务数超过总数的5(默认)%后,就开始调度执行ReduceTask任务了)
拉取: ReduceTask默认启动5个copy线程到完成的MapTask任务节点上分别copy一份属于自己的数据(Http)
拉取过程中,数据也是写入缓冲区,此时也发生溢写(与Map的溢写处理相同,内存=>磁盘,阈值时写入pill文件)
合并:随着拉取的不断进行,也会形成很多的pill(Reduce的pill,是不同Map的部分),这时会继续进行Merge,将不同Map的输出归并排序为更大的排序文件(与Map的Merge处理相同)
注意在Reduce-Shuffle过程中,如果有Combiner,会再对数据做一次Combiner,所以Combiner其实会在Map和Reduce各执行一次.
随着数据拉取,合并最终完成,会再形成一个最终结果key,[values]的东西(values来自不同的Map输出),这就是Reduce的输入数据
reduceTask
然后开始reduceTask就会将这个最终结果交给reduced函数进行处理,函数会作用在每一个排序的Key上
2.4 OutputFormat
默认输出到HDFS上,文件名称是part-00001(当我们输出需要指定到不同于HDFS时,需要自定义输出类继承OutputFormat类)