把输入文件切分成块、由块变成key和value到Mapper类map(k1, v1)的过程
控制输入文件切分、生成key和value传递到Mapper类的map()方法使用(InputFormat,它是一个抽象类,实现则由各个子类完成):
通常在每个MapReduce的Job中,都会调用job.setInputFormatClass(Class<? extends InputFormat> cls)方法,这个方法的作用是指定Job要使用Hadoop中的哪个InputFormat的子类去切分输入文件。
这个类的定义如下:
public abstract class InputFormat<K, V> {
//将输入文件逻辑切片,一个InputSplit会分配给一个Mapper处理
public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;
//将每个InputSplit按照InputFormat<K, V>中的Key和Value类型进行切分为RecordReader<K, V>
public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException;
}
逻辑切片(InputSplit,它是一个抽象类,实现则由各个子类完成):
其中有getSplits()和createRecordReader()两个方法,getSplits()方法负责将一个输入文件进行逻辑切片。例如一个1GMB的文件按照HDFS默认128MB一个块大小切分为8个InputSplit,那么getSplit()
方法的List<InputSplit>集合有8个InputSplit。在子类TextInputFormat中重写的getSplit()方法会从JobContext中获取作业中设置的输入路径INPUT_DIR中的文件,然后按照HDFS默认块大小进行计算逻辑
切分。注意:FileInputFormat会使用一个默认的过滤器用来排除隐藏文件中以“.”和“_”开头的文件。
提供真正的数据给Mapper类的map方法使用(RecordReader,它是一个抽象类,实现则由子类完成):
这时候输入文件在逻辑上已经切分好并存在List<InputSplit>中了,createRecordReader()方法确定了将每个InputSplit对应的块大小循环调用进行物理切分变成key和value。每一次调用RecordReader
的时候都会调用Mapper类的map()方法并将key和value提供给Mapper类的map()方法使用。在子类TextInputFormat<LongWritable, Text>中规定了key和value类型是LongWritable和Text类型,然后将数据指定
为UTF-8编码传递给LineRecordReader,LineRecordReader会将key处理成偏移量,将value处理成行文本提供给Mapper类的map()方法使用。
Shuffle过程
Mapper端的Shuffle
一个Map任务处理输入数据的一部分,当Map任务完成后会生成一些中间结果文件,这些中间结果文件会作为Reduce任务的输入数据,然后Reduce会将前面若干个Map任务的输出汇总到一起并输出。当Map
开始产生输出时,这些数据会被写入到内存中的一个缓冲区(每个Map任务都有一个用来写输出数据的循环内存缓冲区)并做一些预排序以提升效率。这个缓冲区默认大小是100MB,可以通过io.sort.mb属性
来设置。当这个缓冲区到一个特定的阀值(io.sort.mb*io.sort.spill.percent,其中io.sort.spill.percent默认是0.80)80MB时,系统将启动一个后台线程把缓冲区的数据spill溢写到磁盘。溢写过程中
Map任务会继续输出到这个缓冲区,如果缓冲区已满时Map就会阻塞直到溢写完成(这种情况导致了MapReduce作业会延迟)。spill线程在把缓冲区的数据写入磁盘(操作系统的本地文件系统)时会进行一个
二次快速排序,先根据Partition(我们在写MapReduce代码时设置的分区数)排序,然后每个Partition中再按key排序。输出一个索引文件和数据文件。如果代码中设置了Combiner将在排序输出的基础上运行。
Combiner是一个本地Reducer。它在执行Map任务的节点上运行,先对Map的输出做一次简单的Reduce让Map输出的更紧凑,更少的数据会被写入磁盘。当spill文件归并完成后Map将删除所有的临时spill文件。
并告诉ApplicationMaster任务完成。这时候Reduce端通过HTTP获取对应的数据。
Reducer端的Shuffle
复制阶段:
与Map任务输出到本地不同,Reduce任务通常最总会输出到HDFS中。每个Map任务完成的时间不同,因此只要有一个Map任务完成,Reduce任务就开始复制其输出。这就是Reduce任务的复制阶段。Reduce
任务有少量复制线程来并行获取Map输出。默认是5个复制线程,可以通过属性mapred.reduce.parallel.copies设置。
合并阶段:
Reduce Merge分为三种,即内存到内存、内存到磁盘、磁盘到磁盘。默认内存到内存是不启用的,当缓冲区的数据到一定阀值时会启动内存到磁盘的Merge,直到没有Map端数据时才结束。因为Map端没
结束时会一直运行内存到磁盘的Merge。这时磁盘上有很多溢写文件。一旦Map端没有数据时会启动磁盘到磁盘的Merge,直到生成最终的文件。
如果Map输出小会被复制到Reduce任务JVM的内存缓冲区中。如果Map输出大则会复制到磁盘。随着磁盘上数据越来越大,后台线程会将它们合并成更大的排好序的文件。如果Map任务的输出是压缩过的文
件则必须在内存中解压缩才能合并。在合并Map输出时会维持其顺序排序。Reduce端默认的合并因子是10,也就是每次将10个文件合并成一个文件。最后一次合并有可能来自内存和磁盘中。
输入结束阶段:
当数据都合并完成后会生成一个最终文件,当Reduce的输入文件已经确定,这时候Shuffle结束。然后Reduce任务运行,将结果写入HDFS。