@
1. 准备阶段
运行Job.waitForCompletion()
,先使用JobSubmitter
提交Job,在提交之前,会在Job的作业目录中生成以下文件:
job.split
:当前Job的切片信息,有几个切片对象
job.splitmetainfo
:切片对象的属性信息
job.xml
:job所有的属性配置
2. 提交阶段
本地模式
LocalJobRunner
进行提交,如果是HDFS,使用了yarn,则是YARNJobRunner
创建一个LocalJobRunner.Job()
job启动:Job.start()
Map阶段
- 采用线程池提交多个MapTaskRunable线程
- 每个MapTaskRunable线程上,实例化一个
MapTask
对象 - 每个MapTask对象,最终实例化一个
Mapper
Mapper.run()
- 线程运行结束,会在线程的作业目录中生成 file.out文件,保存MapTask输出的所有的
key-value
MapTaskRunable------>MapTask--------->Mapper--------->Mapper.run()------->Mapper.map()
阶段定义
如果有ReduceTask,MapTask运行期间,分为 map(67%)---sort(33%) 两部分
如果没有ReduceTask,MapTask运行期间,只有map(100%)
map: 使用RecordReader
将切片中的数据读入到Mapper.map(),直至写出:context.write(key,value)
Reduce阶段
- 采用线程池提交多个ReduceTaskRunable线程
- 每个ReduceTaskRunable线程上,实例化一个
ReduceTask
对象 - 每个ReduceTask对象,实例化一个
Reducer
- reducer.run()
- 线程运行结束,会在输出目录中生成
part-r-000x
文件,保存ReduceTask输出的所有的key-value
,即最后结果
ReduceTaskRunable------->ReduceTask------>Reducer----->Reducer.run()------>Reducer.reduce()
阶段定义
- copy:使用
shuffle
线程拷贝MapTask指定分区的数据 - sort:将拷贝的所有的分区的数据汇总后,排序
- reduce:对排好序的数据,进行合并
- Shuffle的含义为洗牌,将Map阶段写出的数据,进行洗牌(将数据整理的有序,方便Reducer进行reduce)!
Shuffle阶段横跨MapTask和RedcueTask,在MapTask端也有Shuffle,在RedcueTask也有Shuffle!
具体Shuffle阶段指MapTask的map之后到RedcuceTask的reduce之前!
YARN上运行
在提交Job后,创建MRAppMaster进程!
由MRAppMaster,和RM申请,申请启动多个MapTask,多个ReduceTask
Container------>MapTask--------->Mapper--------->Mapper.run()------->Mapper.map()
Container------->ReduceTask------>Reducer----->Reducer.run()------>Reducer.reduce()