MapTask执行通过执行.run方法:
1.生成TaskAttemptContextImpl实例,此实例中的Configuration就是job本身。
2.得到用户定义的Mapper实现类,也就是map函数的类。
3.得到InputFormat实现类。
4.得到当前task相应的InputSplit.
5.通过InputFormat,得到相应的RecordReader。
6.生成RecordWriter实例,
假设reduce个数为0,生成为MapTask.NewDirectOutputCollector
假设reduce个数不为0,但肯定是一个大于0的数,生成MapTask.NewOutputCollector
假设是有reduce的情况,在collector中会生成一个buffer的collector用来进行内存排序。
通过mapreduce.job.map.output.collector.class配置,默觉得MapTask.MapOutputBuffer
在MapOutputBuffer中:
通过mapreduce.map.sort.spill.percent配置内存flush的比值,默觉得0.8
spill的中文意思是溢出。
通过mapreduce.task.io.sort.mb配置内存bufer的大小。默认是100mb
通过mapreduce.task.index.cache.limit.bytes配置(还不知道是做什么的),默觉得1024*1024
提示,这个配置是用来cache进行spill操作的index的大小。当spillindex达到此值的时候,
须要写入spillindex的文件。
通过map.sort.class配置排序实现类,默觉得QuickSort。高速排序
通过mapreduce.map.output.compress.codec配置map的输出的压缩处理程序。
通过mapreduce.map.output.compress配置map输出是否启用压缩。
默觉得false.
MapOutputBuffer实例生成部分结束。
在生成MapTask.NewOutputCollector同一时候,会
检查是否用户有定义的Partitioner,默认是HashPartitioner。
假设生成的实例为MapTask.NewDirectOutputCollector,也就是没有Reduce的情况下。
不运行排序操作也不运行buffer的缓冲操作,直接写入到output的文件里。
通过OutputFormat的RecordWriter。
下面是mapper.run方法的运行代码:
publicvoidrun(Context context) throwsIOException, InterruptedException {
setup(context);
try{
while(context.nextKeyValue()) {
map(context.getCurrentKey(),context.getCurrentValue(), context);
}
}finally{
cleanup(context);
}
}
由上面的代码能够看出,map运行时。会运行一次setup函数。完毕时会运行一次cleanup函数。
中间仅仅要有值就会调用map函数。
当中run中传入的context生成由来:
if(job.getNumReduceTasks() == 0) {
output =
newNewDirectOutputCollector(taskContext,job, umbilical, reporter);
}else{
output = newNewOutputCollector(taskContext,job, umbilical, reporter);
}
MapContextImpl实例。包括input(RecordReader)与output,也就是上面提到的collector.
org.apache.hadoop.mapreduce.MapContext<INKEY,INVALUE, OUTKEY, OUTVALUE>
mapContext =
newMapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job,getTaskID(),
input, output,
committer,
reporter, split);
WrappedMapper.Context实例。包括MapContextImpl实例。
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
mapperContext =
newWrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
mapContext);
接着看mapper.run中的context.nextKeyValue()函数:
调用WrappedMapper.Context.nextKeyValue()函数,-->
调用MapContextImpl.nextKeyValue函数,-->
调用RecordReader.nextKeyValue函数,RecordReader不在说明。
在map函数对过程处理完毕后,会通过context.write写入分析的数据,
context.write(word,one);
看看此部分是怎样运行的:
调用WrappedMapper.Context.write-->
调用MapContextImpl.write-->TaskInputOutputContextImpl.write-->
MapTask.NewOutputCollector.write/MapTask.NewDirectOutputCollector.write
MapTask.NewDirectOutputCollector.write:
这个里面没什么能够说的,直接写入到输出文件里。
NewDirectOutputCollector(MRJobConfigjobContext,
JobConf job,TaskUmbilicalProtocol umbilical, TaskReporter reporter)
throwsIOException, ClassNotFoundException, InterruptedException {
............................................
out= outputFormat.getRecordWriter(taskContext);
............................................
}
写入函数的定义
publicvoidwrite(K key, V value)
throwsIOException, InterruptedException {
reporter.progress();
longbytesOutPrev = getOutputBytes(fsStats);
直接写入文件。
out.write(key,value);
longbytesOutCurr = getOutputBytes(fsStats);
fileOutputByteCounter.increment(bytesOutCurr- bytesOutPrev);
mapOutputRecordCounter.increment(1);
}
重点来看看MapTask.NewOutputCollector.write这部分的实现:
通过Partitioner来生成reduce的partition值,调用MapOutputBuffer.collect函数。
也就是写入到buffer中。
publicvoidwrite(K key, V value) throwsIOException, InterruptedException {
collector.collect(key,value,
partitioner.getPartition(key,value, partitions));
}
MapOutputBuffer.collector:
publicsynchronized voidcollect(K key, V value, final intpartition
)throwsIOException {
reporter.progress();
检查传入的key的类型是否是job中MapOutputKeyClass的值
if(key.getClass() != keyClass){
thrownewIOException("Type mismatch in keyfrom map: expected "
+keyClass.getName()+ ", received "
+key.getClass().getName());
}
检查传入的value的类型是否是job中MapOutputValueClass的值。
if(value.getClass() != valClass){
thrownewIOException("Type mismatch in valuefrom map: expected "
+valClass.getName()+ ", received "
+value.getClass().getName());
}
检查partition是否在指定的范围内。
if(partition < 0 || partition >= partitions){
thrownewIOException("Illegal partition for" + key + "(" +
partition + ")");
}
检查sortSpillException的值是否为空,假设不为空,表示有spill错误。throwioexception
checkSpillException();
把可写入的buffer的剩余部分减去一个固定的值,并检查可用的buffer是否达到了sort与spill的值
默认是buffer的0.8的大小,假设buffer的0.8与METASIZE取于不等于0时,
得到的值可能会比0.8小METASIZE这么一点。
bufferRemaining-= METASIZE;
if(bufferRemaining<= 0) {
运行spill操作。这部分等下再进行分析
//start spill if the thread is not running and the soft limit has been
//reached
spillLock.lock();
try{
......................此部分代码先不看
} finally{
spillLock.unlock();
}
}
try{
第一次进入时,bufindex的值为0,以后的每一次是key.len+1+value.len+1的值添加。
//serialize key bytes into buffer
intkeystart = bufindex;
把key写入到此实例中的一个BlockingBuffer类型的属性bb中。这是一个buffer.
在写入时把bufferRemaining的值减去key.length的长度。这里面也会检查buffer是否够用
把key写入到kvbuffer中。同一时候把bufindex的值加上key.length。Kvbuffer就是详细的buffer.
在运行写入key/value时,首先是先把bufferRemaining的值减去key.length/value.length的长度。
同一时候检查此时bufferRemaining的值是否会小于或等于0。假设是须要先做spill操作。
否则把数据写入kvbuffer中。并把bufindex的值加上key.length/value.length
详细的写入操作请查看MapTask.Buffer中的write函数。
keySerializer.serialize(key);
这个地方有可能会出现,为什么呢,由于buffer是不停在反复使用。当使用到后面时,
前面可能会已经运行了spill操作。因此到bufindex达到最后的时候。会回到開始位置接着写。
if(bufindex <keystart) {
//wrapped the key; must make contiguous
bb.shiftBufferedKey();
keystart = 0;
}
此时的valstart的值为key结束后的下一个下标值。按key相同的方式写入value
//serialize value bytes into buffer
finalintvalstart = bufindex;
valSerializer.serialize(value);
以下这一行是一个长度为0的bytearray,不做操作。
//It's possible for records to have zero length, i.e. the serializer
//will perform no writes. To ensure that the boundary conditions are
//checked and that the kvindexinvariant is maintained, perform a
//zero-length write into the buffer. The logic monitoring this could be
//moved into collect, but this is cleaner and inexpensive. For now, it
//is acceptable.
bb.write(b0,0, 0);
通过bufmark属性标记下bufindex的值。并返回bufindex的值。此时bufindex的值是val结束的下标。
//the record must be marked after the preceding write, as the metadata
//for this record are not yet written
intvalend = bb.markRecord();
mapOutputRecordCounter.increment(1);
mapOutputByteCounter.increment(
distanceTo(keystart, valend,bufvoid));
记录kv的meta信息。此处是一个IntBuffer的缓冲区,每次向kvmeta中写入4个下标的值,
第一次时,kvindex为0。第二次是kvindex的值为kvmeta.capacity()-4的值。
也就是说第一次是从前面開始写,从第二次開始都是从后面向前面開始写。
把partition的值写入到meta的第2个下标,把keystart写入到第一个下标。
把valstart的值写入到meta的第0个下标,把value的长度写入到第三个下标。
Kvmeta的buffer是例如以下图例的样子
4byte |
4byte |
4byte |
4byte |
4byte |
4byte |
4byte |
4byte |
4byte |
4byte |
4byte |
4byte |
VALSTART(0) |
KEYSTART(1) |
PARTITION(2) |
VALLEN(3) |
|
|
|
|
VALSTART(0) |
KEYSTART(1) |
PARTITION(2) |
VALLEN(3) |
//write accounting info
kvmeta.put(kvindex+ PARTITION,partition);
kvmeta.put(kvindex+ KEYSTART,keystart);
kvmeta.put(kvindex+ VALSTART,valstart);
kvmeta.put(kvindex+ VALLEN,distanceTo(valstart, valend));
//advance kvindex
kvindex= (kvindex- NMETA+ kvmeta.capacity())% kvmeta.capacity();
} catch(MapBufferTooSmallException e) {
LOG.info("Recordtoo large for in-memory buffer: " +e.getMessage());
spillSingleRecord(key, value,partition);
mapOutputRecordCounter.increment(1);
return;
}
}
写入数据到buffer中的实现:
通过MapTask.MapOutputBuffer.Buffer.write方法
publicvoid write(byteb[], int off, intlen)
throwsIOException {
//must always verify the invariant that at least METASIZE bytes are
//available beyond kvindex,even when len== 0
bufferRemaining-= len;
if(bufferRemaining<= 0) {
...............................................
}
此处检查bufindex(kvbuffer中如今的下标值)+len是否达到了bufvoid(默认是kvbuffer的最后)
假设运行过spill操作,buffer写入到下标的最后时,又一次開始从0開始写入后,
bufvoid的值是上一次写入完毕的bufmark的值(最后一次完毕写入的下标)
也就是说如今写入已经达到buffer的最后位置。可是要写入的数据装不完。
如:要写入数据是5个byte,但如今kvbuffer最后端仅仅能写入3个byte,
此时会把于下的2个byte写入到kvbuffer的開始位置。这就是环行buffer
//here, we know that we have sufficient space to write
if(bufindex +len > bufvoid){
finalintgaplen = bufvoid- bufindex;
System.arraycopy(b,off, kvbuffer,bufindex,gaplen);
len -= gaplen;
off += gaplen;
bufindex= 0;
}
System.arraycopy(b,off, kvbuffer,bufindex,len);
bufindex+= len;
}
}
关于当bufindex的值小于keystart时,也就是环行部分又一次開始写入时,运行的shiftBufferedKey
这个部分主要是把buffer中要写入的数据超过了buffer能在最后写入的值时:
write后演示样例值:
要写入的bytearray [1,2,3,4,5]
运行写入后buffer的内容例如以下:最后仅仅能存储3个byte。这里把123写入到最后。
同一时候把45写入到最前面部分
4 |
5 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
2 |
3 |
运行shiftBufferedKey以后,此时buffer的内容变成例如以下:
1 |
2 |
3 |
4 |
5 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
protectedvoid shiftBufferedKey() throwsIOException {
此时的bufmark是上一次完毕写入的bufindex的下标值。得到最后写入的几个byte的长度。
比方上面提到的,写5个,但最后仅仅有3个byte的长度。那么这里得到的就是3.
//spillLock unnecessary; both kvendand kvindexare current
intheadbytelen = bufvoid– bufmark;
把bufvoid也就是可写入的最后下标的值改动成上一次完毕写入的最后一个下标值。
bufvoid= bufmark;
finalintkvbidx = 4 * kvindex;
finalintkvbend = 4 * kvend;
finalintavail =
Math.min(distanceTo(0,kvbidx), distanceTo(0, kvbend));
if(bufindex +headbytelen < avail) {
把环行部分,如上面最后从头開始写入的2个byte向后移动3个byte。
System.arraycopy(kvbuffer,0, kvbuffer,headbytelen, bufindex);
把bufer最后部分的3个byte写入到開始部分。
System.arraycopy(kvbuffer,bufvoid,kvbuffer,0, headbytelen);
把bufindex向后移动几个byte,并又一次计算可用的空间
bufindex+= headbytelen;
bufferRemaining-= kvbuffer.length- bufvoid;
} else{
byte[]keytmp = newbyte[bufindex];
System.arraycopy(kvbuffer,0, keytmp, 0, bufindex);
bufindex= 0;
out.write(kvbuffer,bufmark,headbytelen);
out.write(keytmp);
}
}
}
数据达到buffer的limit时,运行的spill操作:
if(bufferRemaining <= 0) {
//start spill if the thread is not running and the soft limit has been
//reached
spillLock.lock();
try{
do{
假设spillInProgress的值为true时。表示spill操作正在进行。
if(!spillInProgress){
finalintkvbidx = 4 * kvindex;
第一次运行时kvend的值为0,第二次时是kvindex的上一个值(kvend)*4,
kvend表示已经完毕的kvmeta的下标值,kvindex表示如今准备使用的下标值
finalintkvbend = 4 * kvend;
得到已经使用的字节数
//serialized, unspilledbytes always lie between kvindexand
//bufindex,crossing the equator. Note that any void space
//created by a reset must be included in "used" bytes
finalintbUsed = distanceTo(kvbidx, bufindex);
得到已经使用的字节数是否已经达到spill的配置大小,也就是buffer的0.8默认。
finalbooleanbufsoftlimit = bUsed >= softLimit;
这里表示spill完毕,回收空间,
if((kvbend + METASIZE)% kvbuffer.length!=
equator- (equator% METASIZE)){
//spill finished, reclaim space
resetSpill();
bufferRemaining= Math.min(
distanceTo(bufindex,kvbidx) - 2 * METASIZE,
softLimit- bUsed) - METASIZE;
continue;
} elseif(bufsoftlimit && kvindex!= kvend) {
发起spill操作
//spill records, if any collected; check latter, as it may
//be possible for metadataalignment to hit spill pcnt
startSpill();
finalintavgRec = (int)
(mapOutputByteCounter.getCounter()/
mapOutputRecordCounter.getCounter());
//leave at least half the split buffer for serialization data
//ensure that kvindex>= bufindex
finalintdistkvi = distanceTo(bufindex,kvbidx);
finalintnewPos = (bufindex+
Math.max(2* METASIZE- 1,
Math.min(distkvi/ 2,
distkvi/ (METASIZE+ avgRec) * METASIZE)))
% kvbuffer.length;
setEquator(newPos);
bufmark= bufindex= newPos;
finalintserBound = 4 * kvend;
//bytes remaining before the lock must be held and limits
//checked is the minimum of three arcs: the metadataspace, the
//serialization space, and the soft limit
bufferRemaining= Math.min(
//metadatamax
distanceTo(bufend,newPos),
Math.min(
//serialization max
distanceTo(newPos,serBound),
//soft limit
softLimit))- 2 * METASIZE;
}
}
} while(false);
} finally{
spillLock.unlock();
}
}
发起startSpill操作
privatevoid startSpill() {
assert!spillInProgress;
记录住最后一个完毕的kvindex的下标。
kvend= (kvindex+ NMETA)% kvmeta.capacity();
记录住标记住的最后一个完毕的kv写入在kvbuffer中的下标
bufend= bufmark;
设置spill操作正在进行
spillInProgress= true;
LOG.info("Spillingmap output");
LOG.info("bufstart= " + bufstart+ "; bufend = "+ bufmark +
";bufvoid = " + bufvoid);
LOG.info("kvstart= " + kvstart+ "("+ (kvstart* 4) +
");kvend = " + kvend+ "("+ (kvend *4) +
");length = " + (distanceTo(kvend,kvstart,
kvmeta.capacity())+ 1) + "/"+ maxRec);
此处使用了java新的线程通信的方法,notify线程的等待。
spillReady.signal();
}
此时,MapTask.MapOutputBuffer.SpillThread线程接收到signal命令:
publicvoid run() {
spillLock.lock();
spillThreadRunning= true;
try{
while(true){
spillDone.signal();
while(!spillInProgress){
假设线程发现spillInProgress的值是false时,等待,
当buffer中的数据达到sortlimit的值时,通过spillReady.signal来notify此线程。
spillReady.await();
}
try{
spillLock.unlock();
sortAndSpill();
} catch(Throwable t) {
sortSpillException= t;
} finally{
spillLock.lock();
if(bufend <bufstart) {
bufvoid= kvbuffer.length;
}
kvstart= kvend;
bufstart= bufend;
spillInProgress= false;
}
}
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
} finally{
spillLock.unlock();
spillThreadRunning= false;
}
}
}
运行排序与spill操作:
调用MapTask.MapOutputBuffer.sortAndSpill函数:
privatevoid sortAndSpill() throwsIOException, ClassNotFoundException,
InterruptedException{
//approximatethe length of the output file to be the length of the
//buffer+ header lengths for the partitions
finallongsize = (bufend>= bufstart
? bufend- bufstart
: (bufvoid- bufend) +bufstart) +
partitions* APPROX_HEADER_LENGTH;
FSDataOutputStream out = null;
try{
生成写入文件。路径通过在mapreduce.cluster.local.dir配置中写入local的mr路径
在路径下生成(attempid)_spill_(numspills).out或者output/spill(numspills).out文件。
//create spill file
finalSpillRecord spillRec = newSpillRecord(partitions);
finalPath filename =
mapOutputFile.getSpillFileForWrite(numSpills,size);
out = rfs.create(filename);
finalintmstart = kvend/ NMETA;
finalintmend = 1 + // kvendis a valid record
(kvstart>= kvend
?
kvstart
: kvmeta.capacity()+ kvstart)/ NMETA;
运行排序操作。把buffer中的数据进行排序。
排序的比較器通过MapOutputBuffer.compare,
默认是通过key进行排序。
sorter.sort(MapOutputBuffer.this,mstart, mend, reporter);
intspindex = mstart;
finalIndexRecord rec = newIndexRecord();
finalInMemValBytes value = newInMemValBytes();
for(inti = 0; i < partitions;++i) {
IFile.Writer<K, V>writer = null;
try{
longsegmentStart = out.getPos();
writer = newWriter<K, V>(job,out, keyClass,valClass,codec,
spilledRecordsCounter);
检查是否有combiner处理程序,假设没有,直接把buffer中排序后的数据写入到spill文件里。
注意,写入时,数据是按partition从小到大写入。
if(combinerRunner== null){
//spill directly
DataInputBuffer key = newDataInputBuffer();
while(spindex < mend &&
kvmeta.get(offsetFor(spindex% maxRec) +PARTITION)== i) {
finalintkvoff = offsetFor(spindex % maxRec);
intkeystart = kvmeta.get(kvoff+ KEYSTART);
intvalstart = kvmeta.get(kvoff+ VALSTART);
key.reset(kvbuffer,keystart, valstart - keystart);
getVBytesForOffset(kvoff,value);
writer.append(key,value);
++spindex;
}
} else{
此时表示配置有combiner处理程序,通过运行combiner中的reduce程序,把数据进行处理后写入。
intspstart = spindex;
while(spindex < mend &&
kvmeta.get(offsetFor(spindex% maxRec)
+ PARTITION)== i) {
++spindex;
}
//Note: we would like to avoid the combiner if we've fewer
//than some threshold of records for a partition
if(spstart != spindex) {
combineCollector.setWriter(writer);
RawKeyValueIteratorkvIter =
newMRResultIterator(spstart, spindex);
combinerRunner.combine(kvIter,combineCollector);
}
}
//close the writer
writer.close();
此处每写入一个partition的数据后。
生成一个针对此partition在文件里的開始位置,写入此partition的长度。
并加入到spillindex中。
//record offsets
rec.startOffset= segmentStart;
rec.rawLength= writer.getRawLength();
rec.partLength= writer.getCompressedLength();
spillRec.putIndex(rec, i);
writer = null;
} finally{
if(null!= writer) writer.close();
}
}
假设splitindex中的cache的数据大于了配置的值,把新生成的spillindex写入index文件。
假设spillindex没有达到配置的值时,全部的spillindex文件存储到内存中,
假设达到了配置的值以后生成的spillindex文件不进行cache。直接写入到文件里。
后期在读取时通过numSpills的值来从文件里读取,
演示样例代码:for(inti = indexCacheList.size();i < numSpills;++i)
如上代码就是从indexCacheList.size開始。由于此时超过了cache的spillindex直接写入到了文件。
把于下的spillindex从文件里读取出来。
if(totalIndexCacheMemory>= indexCacheMemoryLimit){
//create spill index file
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills,partitions
*MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillRec.writeToFile(indexFilename,job);
} else{
否则把spillindex加入到indexcache中,并把长度累加起来。
indexCacheList.add(spillRec);
totalIndexCacheMemory+=
spillRec.size() *MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
LOG.info("Finishedspill " + numSpills);
++numSpills;
} finally{
if(out != null)out.close();
}
}
map输出的IFile- spill文件格式:
partition1 |
partition2 |
||||||
keylen(4) |
vallen(4) |
key |
value |
keylen(4) |
vallen(4) |
key |
value |
Map的spill-index文件格式:
partition1 |
partition2 |
||||
SegmentStart( partition的開始位置) |
rawlen(总长度) |
CompressedLength(总长度) |
SegmentStart( partition的開始位置) |
rawlen(总长度) |
CompressedLength(总长度) |
此时,mapper.run函数完毕,运行例如以下操作:output.close操作。
try{
input.initialize(split,mapperContext);
mapper.run(mapperContext);
mapPhase.complete();
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
input.close();
input = null;
output.close(mapperContext);
output = null;
}finally{
closeQuietly(input);
closeQuietly(output,mapperContext);
}
此处分析output.close主要分析有reduce的情况,假设没有reduce是直接关闭输出文件。
MapTask.NewOutputCollector.close
调用MapTask.MapOutputBuffer.flush把于下的数据spill到文件。等待SpillThread线程完毕。
运行mergeParts函数合并小的spill文件。
publicvoid close(TaskAttemptContextcontext
) throwsIOException,InterruptedException {
try{
collector.flush();
} catch(ClassNotFoundException cnf) {
thrownewIOException("can't find class ",cnf);
}
collector.close();
}
}
MapOutputBuffer.flush函数操作
publicvoid flush() throwsIOException, ClassNotFoundException,
InterruptedException {
先把buffer中的数据运行sort与spill操作。
LOG.info("Startingflush of map output");
spillLock.lock();
try{
while(spillInProgress){
reporter.progress();
spillDone.await();
}
checkSpillException();
finalintkvbend = 4 * kvend;
if((kvbend + METASIZE)% kvbuffer.length!=
equator- (equator% METASIZE)){
//spill finished
resetSpill();
}
if(kvindex !=kvend) {
kvend= (kvindex+ NMETA)% kvmeta.capacity();
bufend= bufmark;
LOG.info("Spillingmap output");
LOG.info("bufstart= " + bufstart+ "; bufend = "+ bufmark +
";bufvoid = " + bufvoid);
LOG.info("kvstart= " + kvstart+ "("+ (kvstart* 4) +
");kvend = " + kvend+ "("+ (kvend *4) +
");length = " + (distanceTo(kvend,kvstart,
kvmeta.capacity())+ 1) + "/"+ maxRec);
sortAndSpill();
}
} catch(InterruptedException e) {
thrownewIOException("Interrupted whilewaiting for the writer", e);
} finally{
spillLock.unlock();
}
assert!spillLock.isHeldByCurrentThread();
//shut down spill thread and wait for it to exit. Since the preceding
//ensures that it is finished with its work (and sortAndSpill did not
//throw), we elect to use an interrupt instead of setting a flag.
//Spilling simultaneously from this thread while the spill thread
//finishes its work might be both a useful way to extend this and also
//sufficient motivation for the latter approach.
Try{
等待spillthread操作完毕。
spillThread.interrupt();
spillThread.join();
} catch(InterruptedException e) {
thrownewIOException("Spill failed",e);
}
//release sort buffer before the merge
kvbuffer= null;
合并全部的spill小文件。
mergeParts();
Path outputPath =mapOutputFile.getOutputFile();
fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen());
}
mergeParts函数:
privatevoid mergeParts() throwsIOException, InterruptedException,
ClassNotFoundException{
//get the approximate size of the final output/index files
longfinalOutFileSize = 0;
longfinalIndexFileSize = 0;
finalPath[] filename = newPath[numSpills];
finalTaskAttemptID mapId = getTaskID();
首先得到全部的spill的数据文件。
for(inti = 0; i < numSpills;i++) {
filename[i] =mapOutputFile.getSpillFile(i);
finalOutFileSize +=rfs.getFileStatus(filename[i]).getLen();
}
假设仅仅有一个spill文件,直接把生成的spill数据文件与索引文件生成为map的输出文件
说得坦白点就是把spill文件直接rename到目标map的output路径下
if(numSpills== 1) { //the spill is the final output
sameVolRename(filename[0],
mapOutputFile.getOutputFileForWriteInVolume(filename[0]));
if(indexCacheList.size()== 0) {
sameVolRename(mapOutputFile.getSpillIndexFile(0),
mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));
} else{
indexCacheList.get(0).writeToFile(
mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]),job);
}
sortPhase.complete();
return;
}
假设包括多个spill文件,先读取未被cache到内存部分的索引文件(spillindex)
//read in paged indices
for(inti = indexCacheList.size();i < numSpills;++i) {
Path indexFileName =mapOutputFile.getSpillIndexFile(i);
indexCacheList.add(newSpillRecord(indexFileName, job));
}
//makecorrection in the length to include the sequence file header
//lengthsfor each partition
finalOutFileSize += partitions* APPROX_HEADER_LENGTH;
finalIndexFileSize = partitions* MAP_OUTPUT_INDEX_RECORD_LENGTH;
生成相应的output文件与outputindex文件。Index中记录有partition的開始位置与长度
在job中的attempid文件夹下生成一个file.out文件是数据文件的输出
在job中的attempid文件夹下生成一个file.out.index文件是数据索引文件
Path finalOutputFile =
mapOutputFile.getOutputFileForWrite(finalOutFileSize);
Path finalIndexFile =
mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
//Theoutput stream for the final single output file
FSDataOutputStream finalOut =rfs.create(finalOutputFile,true,4096);
假设numSpills个数为0表示没有生成输出文件,此时生成一个空的数据文件。并生成一个索引文件,
此索引文件里每个partition的索引都为0
if(numSpills== 0) {
//createdummy files
IndexRecord rec = newIndexRecord();
SpillRecord sr = newSpillRecord(partitions);
try{
for(inti = 0; i < partitions;i++) {
longsegmentStart = finalOut.getPos();
Writer<K, V> writer =
newWriter<K, V>(job,finalOut, keyClass,valClass,codec,null);
writer.close();
rec.startOffset= segmentStart;
rec.rawLength= writer.getRawLength();
rec.partLength= writer.getCompressedLength();
sr.putIndex(rec, i);
}
sr.writeToFile(finalIndexFile,job);
} finally{
finalOut.close();
}
sortPhase.complete();
return;
}
{
sortPhase.addPhases(partitions);// Divide sort phase into sub-phases
IndexRecord rec = newIndexRecord();
finalSpillRecord spillRec = newSpillRecord(partitions);
此时,从最小的partition開始合并全部的小的spill文件
for(intparts = 0; parts < partitions;parts++) {
//createthe segments to be merged
List<Segment<K,V>>segmentList =
newArrayList<Segment<K, V>>(numSpills);
此处開始迭代全部的spill数据文件。得到spill文件里相应的partition的segment,
加入到一个集合容器中(此时通过每个spill文件相应的index能够拿到segment在文件里的位置)
for(inti = 0; i < numSpills;i++) {
IndexRecord indexRecord =indexCacheList.get(i).getIndex(parts);
Segment<K,V> s =
newSegment<K,V>(job,rfs,filename[i], indexRecord.startOffset,
indexRecord.partLength,codec,true);
segmentList.add(i, s);
if(LOG.isDebugEnabled()){
LOG.debug("MapId="+ mapId + " Reducer="+ parts +
"Spill=" + i + "("+ indexRecord.startOffset+ ","+
indexRecord.rawLength+ ", "+ indexRecord.partLength+ ")");
}
}
读取merge因子,通过mapreduce.task.io.sort.factor配置。
默觉得100
intmergeFactor = job.getInt(JobContext.IO_SORT_FACTOR,100);
//sort the segments only if there are intermediate merges
booleansortSegments = segmentList.size() > mergeFactor;
//merge
@SuppressWarnings("unchecked")
生成一个Merger.MergeQueue队列,依据全部此partition中的segments,
假设当前的spill文件个数超过了配置的merge因子的个数。把segment按文件大小从小到大排序。
RawKeyValueIterator kvIter =Merger.merge(job,rfs,
keyClass,valClass,codec,
segmentList,mergeFactor,
newPath(mapId.toString()),
job.getOutputKeyComparator(),reporter,sortSegments,
null,spilledRecordsCounter,sortPhase.phase(),
TaskType.MAP);
生成Writer实例。
//writemerged output to disk
longsegmentStart = finalOut.getPos();
Writer<K, V> writer =
newWriter<K, V>(job,finalOut, keyClass,valClass,codec,
spilledRecordsCounter);
假设combiner没有配置。
或者spill文件的个数还不达到mapreduce.map.combine.minspills配置的个数,默觉得3个
不运行combiner操作。
直接写入文件。
if(combinerRunner== null|| numSpills< minSpillsForCombine){
Merger.writeFile(kvIter,writer, reporter,job);
} else{
否则运行combiner操作并写入文件。combiner事实上能够理解为没有shuffle的reduce
combineCollector.setWriter(writer);
combinerRunner.combine(kvIter,combineCollector);
}
提示:Merger.MergeQueue队列中每next去读取一条记录,
就会从全部的segment中读取出最小的一个kv,并写入此kv的值,
去运行next操作把全部的segment都放到一个优先级堆中,通过优先堆排序取出最小的一个kv.
//close
writer.close();
sortPhase.startNextPhase();
记录当前partition的索引信息。
//record offsets
rec.startOffset= segmentStart;
rec.rawLength= writer.getRawLength();
rec.partLength= writer.getCompressedLength();
spillRec.putIndex(rec, parts);
}
全部partition合并完毕后。写入索引文件。
并删除spill的小数据文件。
spillRec.writeToFile(finalIndexFile,job);
finalOut.close();
for(inti = 0; i < numSpills;i++) {
rfs.delete(filename[i],true);
}
}
}
结束语:每一个spill文件写入时会运行高速排序(内存中)与combiner操作,
最后多个spill合并时使用外部排序(磁盘)来对文件进行比較并取出最小的kv,写入文件,
此时假设spill文件的个数超过配置的值时。会再做一次combiner操作。
版权声明:本文博客原创文章,博客,未经同意,不得转载。