在MapReduce计算框架中,一个应用程序被划分为Map和Reduce两个计算阶段。他们分别由一个或多个Map Task 和Reduce Task组成。
- Map Task: 处理输入数据集合中的一片数据,并将产生的若干个数据片段写到本地磁盘。
- 按照用户提供的InputFormat将对应的InputSpilt解析成一系列的key/value, 并以此交给用户编写的map()函数处理。
- 按照指定的Partitioner对数据分片,以确定每个key/value将交给哪个Reducer Task处理。
- 将数据交给用户定义的Combiner进行以此本地规约(用户没有定义则直接跳过)
- 将处理结果保存到本地磁盘。
- Reduce Task: 从每个Map Task上远程拷贝相应的数据片段,经过分组聚集和规约后,将结果写到HDFS上作为最终结果。
- 通过HTTP请求从各个已经运行完成的Map Task上拷贝对应的数据分片。
- 待数据拷贝完成,以key为关键字对所有数据进行排序。通过排序,key相同的记录聚集在一起形成若干分组。
- 将每组数据交给用户编写的reduce()函数处理。
- 将结果直接写到HSFS上面作为最终输出结果。
IFile
IFile是一种支持行压缩的存储格式。为了减少MapTask写入磁盘的数据量和跨网络传输的数据量,IFile支持按行压缩数据记录。当前Hadoop提供了ZLib(默认压缩方式)、BZip2等压缩算法。
IFile文件格式:<key-len, value-len, key, value>
排序
排序是MapReduce框架中最重要的从a组之一。Map Task和Reduce Task均会对数据(按照key)进行排序。该操作属于Hadoop的默认行为。任何应用程序均会被排序,而不管逻辑上是否需要。
对于Map Task,它会将处理的结果暂时存放到一个缓冲区,当缓冲区使用率达到一定阈值后,在对缓冲区中的数据进行以此排序。并将这些有序集合以IFile文件的形式写到磁盘上。而当数据处理完毕后,它会对磁盘上所有文件进行一次合并。已将这些文件形成一个大的有序文件。
对于Reduce Task,它从每个Map Task上面远程拷贝相应的数据文件,如果文件大小超过一定阈值,则放到磁盘,否则放到内存。如果磁盘上文件数目达到一定阈值,则进行一次合并以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后写到磁盘上。当所有数据拷贝完毕后,Reduce Task统一对内存和磁盘上的所有数据进行一次合并。
Map Task和Reduce Task的缓冲区数据合并使用Hadoop自己实现的快排算法,而IFile文件合并则使用了基于堆实现的优先队列。
快排
- 枢轴选择:使用序列的首尾和中间元素的中位数作为枢轴
- 子序列划分:两个索引i,j分别从左右两端扫描,i扫描到大于等于枢轴的等值,j扫描到小于等于枢轴的元素停止,然后交换两个元素。重复直到相遇
- 相同元素的优化: 每次划分子序列,将于枢轴相同的元素集中存放到中间位置,让它们不再参与后续的递归处理过程。即序列划分三部分:小于枢轴、等于枢轴、大于枢轴
- 减少递归次数:当子序列中元素数目小于13时,直接使用插入排序算法,不再递归。
优先队列
文件归并由类Merger完成。其采用多轮递归合并的方式。每轮选取最小的前io.sort.factor(默认是10,用户可配置)个文件进行合并。并将产生的文件重新加入带合并列表中。知道剩下的文件数目小于io.sort.factor个,此时,他会返回指向由这些文件组成的小顶堆的迭代器。
Reporter
Reporter用来完成Task周期性的向TaskTracker汇报最新进度和计数器值。TaskReporter类实现了Reporter接口,并以线程形式启动。其汇报的信息中包含两部分:
- 任务执行进度
- Map Task 而言: 使用已读取数据量占数据总量的比例作为任务当前进度值
- Reduce Task: 其可以分解为三个阶段: Shuffle、Sort、Reduce。每个阶段占任务总进度的1/3.考虑在Shuttle阶段,Reduce Task需要从M(M为Map Task数目)个Map Task上读取数据。因此,可被分解为M个阶段,每个阶段占Shuffle进度的1/M。
- 任务计数器值: 是由Hadoop提供的,用于实现跟踪任务运行进度的全局技术功能。任务计数器由两部分组成<name, value>.计数器以组为单位进行管理,一个计数器属于一个计数器组。Hadoop规定一个作业最多包含120个计数器(可通过参数mapreduce.job.counters.limit设定),50个计数器组。