Class < ?extendsShuffleConsumerPlugin > clazz = job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
LOG.info("UsingShuffleConsumerPlugin: " + shuffleConsumerPlugin);
ShuffleConsumerPlugin.ContextshuffleContext = newShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, super.lDirAlloc, reporter, codec, combinerClass, combineCollector, spilledRecordsCounter, reduceCombineInputCounter, shuffledMapsCounter, reduceShuffleBytes, failedShuffleCounter, mergedMapOutputsCounter, taskStatus, copyPhase, sortPhase, this, mapOutputFile, localMapFiles);
shuffleConsumerPlugin.init(shuffleContext);执行shuffle的run函数,得到RawKeyValueIterator的实例。rIter = shuffleConsumerPlugin.run();
Shuffle.run函数定义:.....................................
inteventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH, MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
intmaxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);生成map的完成状态获取线程,并启动此线程,此线程中从am中获取此job中所有完成的map的event通过ShuffleSchedulerImpl实例把所有的map的完成的map的host,
mapid,
等记录到mapLocations容器中。此线程每一秒执行一个获取操作。
//Start the map-completion events fetcher thread
finalEventFetcher < K,
V > eventFetcher = newEventFetcher < K,
V > (reduceId, umbilical, scheduler, this, maxEventsToFetch);
eventFetcher.start();下面看看EventFetcher.run函数的执行过程:以下代码中我只保留了代码的主体部分。...................EventFetcher.run: publicvoid run() {
intfailures = 0;........................intnumNewMaps = getMapCompletionEvents();..................................
}......................
}
EventFetcher.getMapCompletionEvents..................................MapTaskCompletionEventsUpdateupdate = umbilical.getMapCompletionEvents((org.apache.hadoop.mapred.JobID) reduce.getJobID(), fromEventIdx, maxEventsToFetch, (org.apache.hadoop.mapred.TaskAttemptID) reduce);
events = update.getMapTaskCompletionEvents();.....................
for (TaskCompletionEvent event: events) {
scheduler.resolve(event);
if (TaskCompletionEvent.Status.SUCCEEDED == event.getTaskStatus()) {++numNewMaps;
}
}
shecduler是ShuffleShedulerImpl的实例。ShuffleShedulerImpl.resolve caseSUCCEEDED: URI u = getBaseURI(reduceId, event.getTaskTrackerHttp());
addKnownMapOutput(u.getHost() + ":" + u.getPort(), u.toString(), event.getTaskAttemptId());
maxMapRuntime = Math.max(maxMapRuntime, event.getTaskRunTime());
break;.......ShuffleShedulerImpl.addKnownMapOutput函数:把mapid与对应的host添加到mapLocations容器中,MapHost host = mapLocations.get(hostName);
if (host == null) {
host = newMapHost(hostName, hostUrl);
mapLocations.put(hostName, host);
}此时会把host的状设置为PENDING host.addKnownMap(mapId);同时把host添加到pendingHosts容器中。notify相关的Fetcher文件copy线程。
//Mark the host as pending
if (host.getState() == State.PENDING) {
pendingHosts.add(host);
notifyAll();
}.....................
回到ReduceTask.run函数中,接着向下执行
//Start the map-output fetcher threads
booleanisLocal = localMapFiles != null;通过mapreduce.reduce.shuffle.parallelcopies配置的值,默认为5,生成获取map数据的线程数。生成Fetcher线程实例,并启动相关的线程。通过mapreduce.reduce.shuffle.connect.timeout配置连接超时时间。默认180000通过mapreduce.reduce.shuffle.read.timeout配置读取超时时间,默认为180000 finalintnumFetchers = isLocal ? 1 : jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher < K,
V > [] fetchers = newFetcher[numFetchers];
if (isLocal) {
fetchers[0] = newLocalFetcher < K,
V > (jobConf, reduceId, scheduler, merger, reporter, metrics, this, reduceTask.getShuffleSecret(), localMapFiles);
fetchers[0].start();
} else {
for (inti = 0; i < numFetchers; ++i) {
fetchers[i] = newFetcher < K,
V > (jobConf, reduceId, scheduler, merger, reporter, metrics, this, reduceTask.getShuffleSecret());
fetchers[i].start();
}
}.........................
接下来进行Fetcher线程里面,看看Fetcher.run函数运行流程:..........................MapHost host = null;
try {
//If merge is on, block
merger.waitForResource();从ShuffleScheduler中取出一个MapHost实例,
//Get a host to shuffle from
host = scheduler.getHost();
metrics.threadBusy();执行shuffle操作。
//Shuffle
copyFromHost(host);
} finally {
if (host != null) {
scheduler.freeHost(host);
metrics.threadFree();
}
}接下来看看ShuffleScheduler中的getHost函数:........如果pendingHosts的值没有,先wait住,等待EventFetcher线程去获取数据来notify此wait
while (pendingHosts.isEmpty()) {
wait();
}
MapHost host = null;
Iterator < MapHost > iter = pendingHosts.iterator();从pendingHosts中random出一个MapHost,并返回给调用程序。intnumToPick = random.nextInt(pendingHosts.size());
for (inti = 0; i <= numToPick; ++i) {
host = iter.next();
}
pendingHosts.remove(host);........................当得到一个MapHost后,执行copyFromHost来进行数据的copy操作。此时,一个task的host的url样子基本上是这个样子:host: port / mapOutput ? job = xxx & reduce = 123(当前reduce的partid值) & map = copyFromHost的代码部分:.....List < TaskAttemptID > maps = scheduler.getMapsForHost(host);.....Set < TaskAttemptID > remaining = newHashSet < TaskAttemptID > (maps);.....此部分完成后,url样子中map = 后面会有很多个mapid,多个用英文的”,
”号分开的。URLurl = getMapOutputURL(host, maps);此处根据url打开httpconnection,
如果mapreduce.shuffle.ssl.enabled配置为true时,会打开SSL连接。默认为false.openConnection(url);.....设置连接超时时间,header,
读取超时时间等值。并打开HttpConnection的连接。
// put url hashinto http header
connection.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
//set the read timeout
connection.setReadTimeout(readTimeout);
//put shuffle version into httpheader
connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
connect(connection, connectionTimeout);.....执行文件的copy操作。此处是迭代执行,每一个读取一个map的文件。并把remaining中的值去掉一个。直到remaining的值全部读取完成。TaskAttemptID[] failedTasks = null;
while (!remaining.isEmpty() && failedTasks == null) {在copyMapOutput函数中,每次读取一个mapid,
根据MergeManagerImpl中的reserve函数,1.检查map的输出是否超过了mapreduce.reduce.memory.totalbytes配置的大小。此配置的默认值是当前Runtime的maxMemory * mapreduce.reduce.shuffle.input.buffer.percent配置的值。Buffer.percent的默认值为0.90;如果mapoutput超过了此配置的大小时,
生成一个OnDiskMapOutput实例。2.如果没有超过此大小,生成一个InMemoryMapOutput实例。failedTasks = copyMapOutput(host, input, remaining);
}在copyMapOutput函数中首先调用的MergeManagerImpl.reserve函数:
if (!canShuffleToMemory(requestedSize)) {.....returnnewOnDiskMapOutput < K,
V > (mapId, reduceId, this, requestedSize, jobConf, mapOutputFile, fetcher, true);
}.....
if (usedMemory > memoryLimit) {.....,
当前使用的memory已经超过了配置的内存使用大小,此时返回null,把host重新添加到shuffleScheduler的pendingHosts队列中。returnnull;
}
returnunconditionalReserve(mapId, requestedSize, true);生成一个InMemoryMapOutput,
并把usedMemory加上此mapoutput的大小。privatesynchronizedInMemoryMapOutput < K,
V > unconditionalReserve(TaskAttemptID mapId, longrequestedSize, booleanprimaryMapOutput) {
usedMemory += requestedSize;
returnnewInMemoryMapOutput < K,
V > (jobConf, mapId, this, (int) requestedSize, codec, primaryMapOutput);
}
下面是当usedMemory使用超过了指定的大小后,的处理部分,重新把host添加到队列中。如下所示:copyMapOutput函数
if (mapOutput == null) {
LOG.info("fetcher#" + id + "- MergeManager returned status WAIT ...");
//Notan error but wait to process data.
returnEMPTY_ATTEMPT_ID_ARRAY;
}此时host中还有没处理完成的mapoutput,
在Fetcher.run中,重新添加到队列中把此host
if (host != null) {
scheduler.freeHost(host);
metrics.threadFree();
}.........接下来还是在copyMapOutput函数中,通过mapoutput也就是merge.reserve函数返回的实例的shuffle函数。如果mapoutput是InMemoryMapOutput,
在调用shuffle时,直接把map输出写入到内存。如果是OnDiskMapOutput,
在调用shuffle时,直接把map的输出写入到local临时文件中。....最后,执行ShuffleScheduler.copySucceeded完成文件的copy,
调用mapout.commit函数。scheduler.copySucceeded(mapId, host, compressedLength, endTime - startTime, mapOutput);并从remaining中移出处理过的mapid,
接下来看看MapOutput.commit函数:a.InMemoryMapOutput.commit函数:publicvoidcommit() throwsIOException {
merger.closeInMemoryFile(this);
}调用MergeManagerImpl.closeInMemoryFile函数: publicsynchronizedvoidcloseInMemoryFile(InMemoryMapOutput < K, V > mapOutput) {把此mapOutput实例添加到inMemoryMapOutputs列表中。inMemoryMapOutputs.add(mapOutput);
LOG.info("closeInMemoryFile-> map-output of size: " + mapOutput.getSize() + ",inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size() + ",commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory);把commitMemory的大小增加当前传入的mapoutput的size大小。commitMemory += mapOutput.getSize();检查是否达到merge的值,此值是mapreduce.reduce.memory.totalbytes配置 * mapreduce.reduce.shuffle.merge.percent配置的值,默认是当前Runtime的memory * 0.90 * 0.90也就是说,只有有新的mapoutput加入,这个检查条件就肯定会达到
//Can hang if mergeThreshold is really low.
if (commitMemory >= mergeThreshold) {.......把正在进行merge的mapoutput列表添加到一起发起merge操作。inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
inMemoryMergedMapOutputs.clear();
inMemoryMerger.startMerge(inMemoryMapOutputs);
commitMemory = 0L; // Reset commitMemory.
}如果mapreduce.reduce.merge.memtomem.enabled配置为true,
默认为false同时inMemoryMapOutputs中的mapoutput个数达到了mapreduce.reduce.merge.memtomem.threshold配置的值,默认值是mapreduce.task.io.sort.factor配置的值,默认为100发起memTomem的merger操作。
if (memToMemMerger != null) {
if (inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) {
memToMemMerger.startMerge(inMemoryMapOutputs);
}
}
}
MergemanagerImpl.InMemoryMerger.merger函数操作:在执行inMemoryMerger.startMerge(inMemoryMapOutputs);操作后,会notify此线程,同时执行merger函数:publicvoidmerge(List < InMemoryMapOutput < K, V >> inputs) throwsIOException {
if (inputs == null || inputs.size() == 0) {
return;
}....................TaskAttemptID mapId = inputs.get(0).getMapId();
TaskID mapTaskId = mapId.getTaskID();
List < Segment < K,
V >> inMemorySegments = newArrayList < Segment < K,
V >> ();生成InMemoryReader实例,并把传入的容器清空,把生成好后的segment放到到inmemorysegments中。longmergeOutputSize = createInMemorySegments(inputs, inMemorySegments, 0);
intnoInMemorySegments = inMemorySegments.size();生成一个输出的文件路径,Path outputPath = mapOutputFile.getInputFileForWrite(mapTaskId, mergeOutputSize).suffix(Task.MERGED_OUTPUT_PREFIX);针对输出的临时文件生成一个Write实例。Writer < K,
V > writer = newWriter < K,
V > (jobConf, rfs, outputPath, (Class < K > ) jobConf.getMapOutputKeyClass(), (Class < V > ) jobConf.getMapOutputValueClass(), codec, null);
RawKeyValueIterator rIter = null;
CompressAwarePathcompressAwarePath;
try {
LOG.info("Initiatingin-memory merge with " + noInMemorySegments + "segments...");此部分与map端的输出没什么区别,得到几个segment的文件的一个iterator,
此部分是一个优先堆,每一次next都会从所有的segment中读取出最小的一个key与value rIter = Merger.merge(jobConf, rfs, (Class < K > ) jobConf.getMapOutputKeyClass(), (Class < V > ) jobConf.getMapOutputValueClass(), inMemorySegments, inMemorySegments.size(), newPath(reduceId.toString()), (RawComparator < K > ) jobConf.getOutputKeyComparator(), reporter, spilledRecordsCounter, null, null);如果没有combiner程序,直接写入到文件,否则,如果有combiner,先执行combiner处理。
if (null == combinerClass) {
Merger.writeFile(rIter, writer, reporter, jobConf);
} else {
combineCollector.setWriter(writer);
combineAndSpill(rIter, reduceCombineInputCounter);
}
writer.close();此处与map端的输出不同的地方在这里,这里不写入spillindex文件,而是生成一个CompressAwarePath,把输出路径,
大小写入到此实例中。compressAwarePath = newCompressAwarePath(outputPath, writer.getRawLength(), writer.getCompressedLength());
LOG.info(reduceId + "Merge of the " + noInMemorySegments + "files in-memory complete." + "Local file is " + outputPath + "of size " + localFS.getFileStatus(outputPath).getLen());
} catch(IOException e) {
//makesure that we delete the ondiskfile that we created
//earlierwhen we invoked cloneFileAttributes
localFS.delete(outputPath, true);
throwe;
}此处,把生成的文件添加到onDiskMapOutputs属性中,并检查此容器中的文件是否达到了mapreduce.task.io.sort.factor配置的值,如果是,发起disk的merger操作。
//Note the output of the merge
closeOnDiskFile(compressAwarePath);
}
}上面最后一行的全部定义在下面这里。publicsynchronizedvoidcloseOnDiskFile(CompressAwarePath file) {
onDiskMapOutputs.add(file);
if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
onDiskMerger.startMerge(onDiskMapOutputs);
}
}
b.OnDiskMapOutput.commit函数:把tmp文件rename到指定的目录下,生成一个CompressAwarePath实例,调用上面提到的处理程序。publicvoidcommit() throwsIOException {
fs.rename(tmpOutputPath, outputPath);
CompressAwarePathcompressAwarePath = newCompressAwarePath(outputPath, getSize(), this.compressedSize);
merger.closeOnDiskFile(compressAwarePath);
}
MergeManagerImpl.OnDiskMerger.merger函数:这个函数到现在基本上没有什么可以解说的东西,注意一点就是,每merge一个文件后,会把这个merge后的文件路径重新添加到onDiskMapOutputs容器中。publicvoidmerge(List < CompressAwarePath > inputs) throwsIOException {
//sanity check
if (inputs == null || inputs.isEmpty()) {
LOG.info("Noondisk files to merge...");
return;
}
longapproxOutputSize = 0;
intbytesPerSum = jobConf.getInt("io.bytes.per.checksum", 512);
LOG.info("OnDiskMerger:We have " + inputs.size() + "map outputs on disk. Triggering merge...");
//1. Prepare the list of files to be merged.
for (CompressAwarePath file: inputs) {
approxOutputSize += localFS.getFileStatus(file).getLen();
}
//add the checksum length
approxOutputSize += ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
//2. Start the on-disk merge process
Path outputPath = localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
Writer < K,
V > writer = newWriter < K,
V > (jobConf, rfs, outputPath, (Class < K > ) jobConf.getMapOutputKeyClass(), (Class < V > ) jobConf.getMapOutputValueClass(), codec, null);
RawKeyValueIterator iter = null;
CompressAwarePathcompressAwarePath;
Path tmpDir = newPath(reduceId.toString());
try {
iter = Merger.merge(jobConf, rfs, (Class < K > ) jobConf.getMapOutputKeyClass(), (Class < V > ) jobConf.getMapOutputValueClass(), codec, inputs.toArray(newPath[inputs.size()]), true, ioSortFactor, tmpDir, (RawComparator < K > ) jobConf.getOutputKeyComparator(), reporter, spilledRecordsCounter, null, mergedMapOutputsCounter, null);
Merger.writeFile(iter, writer, reporter, jobConf);
writer.close();
compressAwarePath = newCompressAwarePath(outputPath, writer.getRawLength(), writer.getCompressedLength());
} catch(IOException e) {
localFS.delete(outputPath, true);
throwe;
}
closeOnDiskFile(compressAwarePath);
LOG.info(reduceId + "Finished merging " + inputs.size() + "map output files on disk of total-size " + approxOutputSize + "." + "Local output file is " + outputPath + " of size " + localFS.getFileStatus(outputPath).getLen());
}
}
ok,现在map的copy部分执行完成,回到ShuffleConsumerPlugin的run方法中,也就是Shuffle的run方法中,接着上面的代码向下分析:此处等待所有的copy操作完成,
//Wait for shuffle to complete successfully
while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
reporter.progress();
synchronized(this) {
if (throwable != null) {
thrownewShuffleError("error in shuffle in " + throwingThreadName, throwable);
}
}
}如果执行到这一行时,说明所有的mapcopy操作已经完成,关闭查找map运行状态的线程与执行copy操作的几个线程。
//Stop the event-fetcher thread
eventFetcher.shutDown();
//Stop the map-output fetcher threads
for (Fetcher < K, V > fetcher: fetchers) {
fetcher.shutDown();
}
//stop the scheduler
scheduler.close();发am发送状态,通知AM,此时要执行排序操作。copyPhase.complete(); // copy is already complete
taskStatus.setPhase(TaskStatus.Phase.SORT);reduceTask.statusUpdate(umbilical);
执行最后的merge, 其实在合并所有文件与memory中的数据时,也同时会进行排序操作。
//Finish the on-going merges...
RawKeyValueIterator kvIter = null;
try {
kvIter = merger.close();
} catch(Throwable e) {
thrownewShuffleError("Error while doingfinal merge ", e);
}
//Sanity check
synchronized(this) {
if (throwable != null) {
thrownewShuffleError("error in shuffle in " + throwingThreadName, throwable);
}
}最后返回这个合并后的iterator实例。returnkvIter;
Merger也就是MergeManagerImpl.close函数:publicRawKeyValueIterator close() throwsThrowable {关闭几个merge的线程,在关闭时会等待现有的merge完成。
//Wait for on-going merges to complete
if (memToMemMerger != null) {
memToMemMerger.close();
}
inMemoryMerger.close();
onDiskMerger.close();
List < InMemoryMapOutput < K,
V >> memory = newArrayList < InMemoryMapOutput < K,
V >> (inMemoryMergedMapOutputs);
inMemoryMergedMapOutputs.clear();
memory.addAll(inMemoryMapOutputs);
inMemoryMapOutputs.clear();
List < CompressAwarePath > disk = newArrayList < CompressAwarePath > (onDiskMapOutputs);
onDiskMapOutputs.clear();执行最终的merge操作。returnfinalMerge(jobConf, rfs, memory, disk);
}最后的一个merge操作privateRawKeyValueIterator finalMerge(JobConf job, FileSystem fs, List < InMemoryMapOutput < K, V >> inMemoryMapOutputs, List < CompressAwarePath > onDiskMapOutputs) throwsIOException {
LOG.info("finalMergecalled with " + inMemoryMapOutputs.size() + " in-memory map-outputs and " + onDiskMapOutputs.size() + "on-disk map-outputs");
finalfloatmaxRedPer = job.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f);
if (maxRedPer > 1.0 || maxRedPer < 0.0) {
thrownewIOException(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT + maxRedPer);
}得到可以cache到内存的大小,
比例通过mapreduce.reduce.input.buffer.percent配置,intmaxInMemReduce = (int) Math.min(Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE);
//merge configparams
Class < K > keyClass = (Class < K > ) job.getMapOutputKeyClass();
Class < V > valueClass = (Class < V > ) job.getMapOutputValueClass();
booleankeepInputs = job.getKeepFailedTaskFiles();
finalPath tmpDir = newPath(reduceId.toString());
finalRawComparator < K > comparator = (RawComparator < K > ) job.getOutputKeyComparator();
//segments required to vacate memory
List < Segment < K,
V >> memDiskSegments = newArrayList < Segment < K,
V >> ();
longinMemToDiskBytes = 0;
booleanmergePhaseFinished = false;
if (inMemoryMapOutputs.size() > 0) {
TaskID mapId = inMemoryMapOutputs.get(0).getMapId().getTaskID();这个地方根据可cache到内存的值,把不能cache到内存的部分生成InMemoryReader实例,并添加到memDiskSegments容器中。inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs, memDiskSegments, maxInMemReduce);
finalintnumMemDiskSegments = memDiskSegments.size();把内存中多于部分的mapoutput数据写入到文件中,并把文件路径添加到onDiskMapOutputs容器中。
if (numMemDiskSegments > 0 && ioSortFactor > onDiskMapOutputs.size()) {...........此部分主要是写入内存中多于的mapoutput到磁盘中去mergePhaseFinished = true;
//must spill to disk, but can't retain in-memfor intermediate merge
finalPath outputPath = mapOutputFile.getInputFileForWrite(mapId, inMemToDiskBytes).suffix(Task.MERGED_OUTPUT_PREFIX);
finalRawKeyValueIterator rIter = Merger.merge(job, fs, keyClass, valueClass, memDiskSegments, numMemDiskSegments, tmpDir, comparator, reporter, spilledRecordsCounter, null, mergePhase);
Writer < K,
V > writer = newWriter < K,
V > (job, fs, outputPath, keyClass, valueClass, codec, null);
try {
Merger.writeFile(rIter, writer, reporter, job);
writer.close();
onDiskMapOutputs.add(newCompressAwarePath(outputPath, writer.getRawLength(), writer.getCompressedLength()));
writer = null;
//add to list of final disk outputs.
} catch(IOException e) {
if (null != outputPath) {
try {
fs.delete(outputPath, true);
} catch(IOException ie) {
//NOTHING
}
}
throwe;
} finally {
if (null != writer) {
writer.close();
}
}
LOG.info("Merged" + numMemDiskSegments + "segments, " + inMemToDiskBytes + "bytes to disk to satisfy " + "reducememory limit");
inMemToDiskBytes = 0;
memDiskSegments.clear();
}
elseif(inMemToDiskBytes != 0) {
LOG.info("Keeping" + numMemDiskSegments + "segments, " + inMemToDiskBytes + "bytes in memory for " + "intermediate,on-disk merge");
}
}
//segments on disk
List < Segment < K,
V >> diskSegments = newArrayList < Segment < K,
V >> ();
longonDiskBytes = inMemToDiskBytes;
longrawBytes = inMemToDiskBytes;生成目前文件中有的所有的mapoutput路径的onDisk数组CompressAwarePath[] onDisk = onDiskMapOutputs.toArray(newCompressAwarePath[onDiskMapOutputs.size()]);
for (CompressAwarePath file: onDisk) {
longfileLength = fs.getFileStatus(file).getLen();
onDiskBytes += fileLength;
rawBytes += (file.getRawDataLength() > 0) ? file.getRawDataLength() : fileLength;
LOG.debug("Diskfile: " + file + "Length is " + fileLength);把现在reduce端接收过来并存储到文件中的mapoutput生成segment并添加到distSegments容器中diskSegments.add(newSegment < K, V > (job, fs, file, codec, keepInputs, (file.toString().endsWith(Task.MERGED_OUTPUT_PREFIX) ? null: mergedMapOutputsCounter), file.getRawDataLength()));
}
LOG.info("Merging" + onDisk.length + " files, " + onDiskBytes + "bytes from disk");按内容的大小从小到大排序此distSegments容器Collections.sort(diskSegments, newComparator < Segment < K, V >> () {
publicintcompare(Segment < K, V > o1, Segment < K, V > o2) {
if (o1.getLength() == o2.getLength()) {
return0;
}
returno1.getLength() < o2.getLength() ? -1 : 1;
}
});把现在memory中所有的mapoutput内容生成segment并添加到finalSegments容器中。
//build final list of segments from merged backed by disk + in-mem
List < Segment < K,
V >> finalSegments = newArrayList < Segment < K,
V >> ();
longinMemBytes = createInMemorySegments(inMemoryMapOutputs, finalSegments, 0);
LOG.info("Merging" + finalSegments.size() + "segments, " + inMemBytes + "bytes from memory into reduce");
if (0 != onDiskBytes) {
finalintnumInMemSegments = memDiskSegments.size();
diskSegments.addAll(0, memDiskSegments);
memDiskSegments.clear();
//Pass mergePhase only if there is a going to be intermediate
//merges. See comment where mergePhaseFinished is being set
Progress thisPhase = (mergePhaseFinished) ? null: mergePhase;这个部分是把现在磁盘上的mapoutput生成一个iterator,
RawKeyValueIterator diskMerge = Merger.merge(job, fs, keyClass, valueClass, codec, diskSegments, ioSortFactor, numInMemSegments, tmpDir, comparator, reporter, false, spilledRecordsCounter, null, thisPhase);
diskSegments.clear();
if (0 == finalSegments.size()) {
returndiskMerge;
}把现在磁盘上的iterator也同样添加到finalSegments容器中,也就是此时,这个容器中有两个优先堆排序的队列,每next一次,要从内存与磁盘中找出最小的一个kv.finalSegments.add(newSegment < K, V > (newRawKVIteratorReader(diskMerge, onDiskBytes), true, rawBytes));
}
returnMerger.merge(job, fs, keyClass, valueClass, finalSegments, finalSegments.size(), tmpDir, comparator, reporter, spilledRecordsCounter, null, null);
}
shuffle部分现在全部执行完成,重新加到ReduceTask.run函数中,接着代码向下分析:rIter = shuffleConsumerPlugin.run();............RawComparatorcomparator = job.getOutputValueGroupingComparator();
if (useNewApi) {
runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);
} else {
runOldReducer........
}在以上代码中执行runNewReducer主要是执行reduce的run函数,org.apache.hadoop.mapreduce.TaskAttemptContexttaskContext = neworg.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(), reporter);
//make a reducer
org.apache.hadoop.mapreduce.Reducer < INKEY, INVALUE, OUTKEY, OUTVALUE > reducer = (org.apache.hadoop.mapreduce.Reducer < INKEY, INVALUE, OUTKEY, OUTVALUE > ) ReflectionUtils.newInstance(taskContext.getReducerClass(), job);org.apache.hadoop.mapreduce.RecordWriter < OUTKEY, OUTVALUE > trackedRW = newNewTrackingRecordWriter < OUTKEY, OUTVALUE > (this, taskContext);job.setBoolean("mapred.skip.on", isSkipping());job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, getTaskID(), rIter, reduceInputKeyCounter, reduceInputValueCounter, trackedRW, committer, reporter, comparator, keyClass, valueClass);
try {
reducer.run(reducerContext);
} finally {
trackedRW.close(reducerContext);
}
以上代码中创建Reducer运行的Context, 并执行reducer.run函数:createReduceContext函数定义部分代码:org.apache.hadoop.mapreduce.ReduceContext < INKEY, INVALUE, OUTKEY, OUTVALUE > reduceContext = newReduceContextImpl < INKEY, INVALUE, OUTKEY, OUTVALUE > (job, taskId, rIter, inputKeyCounter, inputValueCounter, output, committer, reporter, comparator, keyClass, valueClass);
org.apache.hadoop.mapreduce.Reducer < INKEY, INVALUE, OUTKEY, OUTVALUE > .Context reducerContext = newWrappedReducer < INKEY, INVALUE, OUTKEY, OUTVALUE > ().getReducerContext(reduceContext);ReduceContextImpl主要是执行在RawKeyValueInterator中读取数据的相关操作。Reducer.run函数:publicvoid run(Context context) throwsIOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
//If a back up store is used, reset it
Iterator < VALUEIN > iter = context.getValues().iterator();
if (iterinstanceofReduceContext.ValueIterator) { ((ReduceContext.ValueIterator < VALUEIN > ) iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}在run函数中通过context.nextkey来得到下一行的数据,这部分主要在ReduceContextImpl中完成:nextkey调用nextKeyValue函数:publicboolean nextKeyValue() throwsIOException, InterruptedException {
if (!hasMore) {
key = null;
value = null;
returnfalse;
}此处用来检查是否是一个key下面的第一个value,
如果是第一个value时,此值为false,
也就是说,nextKeyIsSame的值是true时,表示现在next的数据与current的key是一行数据。否则表示已经进行了换行操作。firstValue = !nextKeyIsSame;执行一下RawKeyValueInterator(也就是Merge中的队列),得到当前最小的key DataInputBuffer nextKey = input.getKey();把key设置到buffer中,设置到buffer中的目的是为了通过keyDeserializer来读取一个key的值。currentRawKey.set(nextKey.getData(), nextKey.getPosition(), nextKey.getLength() - nextKey.getPosition());
buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());从buffer中读取key的值,并存储到key中,这个地方要注意一下,下面先看看这部分的定义:.........................生成一个key的Deserializer实例,this.keyDeserializer = serializationFactory.getDeserializer(keyClass);把buffer当成keyDeserializer的InputStream。this.keyDeserializer.open(buffer);
Deserializer中执行deserializer函数的定义:此部分定义可以看出,一个key / value只会生成实例,此部分从性能上考虑主要是为了减少对象的生成。每次生成一个数据时,都是通过readFields重新去生成Writable实例中的内容,因此,很多同学在reduce中使用value时,会出现数据引用不对的情况,因为对象还是同一个对象,但值是最后一个,所以会出现数据不对的情况publicWritable deserialize(Writable w) throwsIOException {
Writable writable;
if (w == null) {
writable = (Writable) ReflectionUtils.newInstance(writableClass, getConf());
} else {
writable = w;
}
writable.readFields(dataIn);
returnwritable;
}.........................读取key的内容key = keyDeserializer.deserialize(key);按key相同的方式,得到当前的value的值,DataInputBuffer nextVal = input.getValue();
buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength() - nextVal.getPosition());
value = valueDeserializer.deserialize(value);
currentKeyLength = nextKey.getLength() - nextKey.getPosition();
currentValueLength = nextVal.getLength() - nextVal.getPosition();
isMarked的值为false,
同时backupStore属性为null
if (isMarked) {
backupStore.write(nextKey, nextVal);
}把input执行一次next操作,此处会从所有的文件 / memory中找到最小的一个kv.hasMore = input.next();
if (hasMore) {比较一下,是否与currentkey是同一个key,
如果是表示在同一行中。也就是key相同。nextKey = input.getKey();
nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, currentRawKey.getLength(), nextKey.getData(), nextKey.getPosition(), nextKey.getLength() - nextKey.getPosition()) == 0;
} else {
nextKeyIsSame = false;
}
inputValueCounter.increment(1);
returntrue;
}
接下来是调用reduce函数,此时会通过context.getValues函数把key对应的所有的value传给reduce.此处的context.getValues如下所示:ReduceContextImpl.getValues() public Iterable < VALUEIN > getValues() throwsIOException, InterruptedException {
returniterable;
}以上代码中直接返回的是iterable的实例,此实例在ReduceContextImpl实例生成时生成。privateValueIterable iterable = newValueIterable();这个类是ReduceContextImpl中的内部类protectedclass ValueIterable implementsIterable < VALUEIN > {
privateValueIterator iterator = newValueIterator();@Override publicIterator < VALUEIN > iterator() {
returniterator;
}
}此实例中引用一个ValueIterator类,这也是一个内部类。每次进行执行时,通过此ValueIterator.next来获取一条数据,publicVALUEIN next() {
inReset的值默认为false.也就是说inReset检查内部的代码不会执行,其实backupStore本身值就是null如果想使用backupStore,
需要执行其内部的make函数。
if (inReset) {.................里面的代码不分析
}如果是key下面的第一个value,
把firstValue设置为false,
因为下一次来时,就不是firstValue了.返回当前的value
//if this is the first record, we don't need to advance
if (firstValue) {
firstValue = false;
returnvalue;
}
//if this isn't the first record and the next key is different, they
//can't advance it here.
if (!nextKeyIsSame) {
thrownewNoSuchElementException("iteratepast last value");
}
//otherwise, go to the next key/value pair
try {这里表示不是第一个value的时候,也就是firstValue的值为false,
执行一下nextKeyValue函数,得到当前的value.返回。nextKeyValue();
returnvalue;
} catch(IOException ie) {
thrownewRuntimeException("next valueiterator failed", ie);
} catch(InterruptedException ie) {
//this is bad, but we can't modify the exception list of java.util
thrownewRuntimeException("next valueiterator interrupted", ie);
}
}
当reduce执行完成后的输出,跟map端无reduce时的输出一样。直接输出。
Class < ?extendsShuffleConsumerPlugin > clazz = job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);LOG.info("UsingShuffleConsumerPlugin: " + shuffleConsumerPlugin);
ShuffleConsumerPlugin.ContextshuffleContext = newShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, super.lDirAlloc, reporter, codec, combinerClass, combineCollector, spilledRecordsCounter, reduceCombineInputCounter, shuffledMapsCounter, reduceShuffleBytes, failedShuffleCounter, mergedMapOutputsCounter, taskStatus, copyPhase, sortPhase, this, mapOutputFile, localMapFiles);shuffleConsumerPlugin.init(shuffleContext);执行shuffle的run函数,得到RawKeyValueIterator的实例。rIter = shuffleConsumerPlugin.run();
Shuffle.run函数定义:.....................................
inteventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH, MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());intmaxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);生成map的完成状态获取线程,并启动此线程,此线程中从am中获取此job中所有完成的map的event通过ShuffleSchedulerImpl实例把所有的map的完成的map的host,mapid,等记录到mapLocations容器中。此线程每一秒执行一个获取操作。//Start the map-completion events fetcher threadfinalEventFetcher < K,V > eventFetcher = newEventFetcher < K,V > (reduceId, umbilical, scheduler, this, maxEventsToFetch);eventFetcher.start();下面看看EventFetcher.run函数的执行过程:以下代码中我只保留了代码的主体部分。...................EventFetcher.run: publicvoid run() { intfailures = 0;........................intnumNewMaps = getMapCompletionEvents();..................................}......................}EventFetcher.getMapCompletionEvents..................................MapTaskCompletionEventsUpdateupdate = umbilical.getMapCompletionEvents((org.apache.hadoop.mapred.JobID) reduce.getJobID(), fromEventIdx, maxEventsToFetch, (org.apache.hadoop.mapred.TaskAttemptID) reduce);events = update.getMapTaskCompletionEvents();.....................for (TaskCompletionEvent event: events) { scheduler.resolve(event); if (TaskCompletionEvent.Status.SUCCEEDED == event.getTaskStatus()) {++numNewMaps; }}shecduler是ShuffleShedulerImpl的实例。ShuffleShedulerImpl.resolve caseSUCCEEDED: URI u = getBaseURI(reduceId, event.getTaskTrackerHttp());addKnownMapOutput(u.getHost() + ":" + u.getPort(), u.toString(), event.getTaskAttemptId());maxMapRuntime = Math.max(maxMapRuntime, event.getTaskRunTime());break;.......ShuffleShedulerImpl.addKnownMapOutput函数:把mapid与对应的host添加到mapLocations容器中,MapHost host = mapLocations.get(hostName);if (host == null) { host = newMapHost(hostName, hostUrl); mapLocations.put(hostName, host);}此时会把host的状设置为PENDING host.addKnownMap(mapId);同时把host添加到pendingHosts容器中。notify相关的Fetcher文件copy线程。//Mark the host as pendingif (host.getState() == State.PENDING) { pendingHosts.add(host); notifyAll();}.....................
回到ReduceTask.run函数中,接着向下执行//Start the map-output fetcher threadsbooleanisLocal = localMapFiles != null;通过mapreduce.reduce.shuffle.parallelcopies配置的值,默认为5,生成获取map数据的线程数。生成Fetcher线程实例,并启动相关的线程。通过mapreduce.reduce.shuffle.connect.timeout配置连接超时时间。默认180000通过mapreduce.reduce.shuffle.read.timeout配置读取超时时间,默认为180000 finalintnumFetchers = isLocal ? 1 : jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);Fetcher < K,V > [] fetchers = newFetcher[numFetchers];if (isLocal) { fetchers[0] = newLocalFetcher < K, V > (jobConf, reduceId, scheduler, merger, reporter, metrics, this, reduceTask.getShuffleSecret(), localMapFiles); fetchers[0].start();} else { for (inti = 0; i < numFetchers; ++i) { fetchers[i] = newFetcher < K, V > (jobConf, reduceId, scheduler, merger, reporter, metrics, this, reduceTask.getShuffleSecret()); fetchers[i].start(); }}.........................
接下来进行Fetcher线程里面,看看Fetcher.run函数运行流程:..........................MapHost host = null;try { //If merge is on, block merger.waitForResource();从ShuffleScheduler中取出一个MapHost实例, //Get a host to shuffle from host = scheduler.getHost(); metrics.threadBusy();执行shuffle操作。 //Shuffle copyFromHost(host);} finally { if (host != null) { scheduler.freeHost(host); metrics.threadFree(); }}接下来看看ShuffleScheduler中的getHost函数:........如果pendingHosts的值没有,先wait住,等待EventFetcher线程去获取数据来notify此waitwhile (pendingHosts.isEmpty()) { wait();}
MapHost host = null;Iterator < MapHost > iter = pendingHosts.iterator();从pendingHosts中random出一个MapHost,并返回给调用程序。intnumToPick = random.nextInt(pendingHosts.size());for (inti = 0; i <= numToPick; ++i) { host = iter.next();}
pendingHosts.remove(host);........................当得到一个MapHost后,执行copyFromHost来进行数据的copy操作。此时,一个task的host的url样子基本上是这个样子:host: port / mapOutput ? job = xxx & reduce = 123(当前reduce的partid值) & map = copyFromHost的代码部分:.....List < TaskAttemptID > maps = scheduler.getMapsForHost(host);.....Set < TaskAttemptID > remaining = newHashSet < TaskAttemptID > (maps);.....此部分完成后,url样子中map = 后面会有很多个mapid,多个用英文的”,”号分开的。URLurl = getMapOutputURL(host, maps);此处根据url打开httpconnection,如果mapreduce.shuffle.ssl.enabled配置为true时,会打开SSL连接。默认为false.openConnection(url);.....设置连接超时时间,header,读取超时时间等值。并打开HttpConnection的连接。// put url hashinto http headerconnection.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);//set the read timeoutconnection.setReadTimeout(readTimeout);//put shuffle version into httpheaderconnection.addRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);connect(connection, connectionTimeout);.....执行文件的copy操作。此处是迭代执行,每一个读取一个map的文件。并把remaining中的值去掉一个。直到remaining的值全部读取完成。TaskAttemptID[] failedTasks = null;while (!remaining.isEmpty() && failedTasks == null) {在copyMapOutput函数中,每次读取一个mapid, 根据MergeManagerImpl中的reserve函数,1.检查map的输出是否超过了mapreduce.reduce.memory.totalbytes配置的大小。此配置的默认值是当前Runtime的maxMemory * mapreduce.reduce.shuffle.input.buffer.percent配置的值。Buffer.percent的默认值为0.90;如果mapoutput超过了此配置的大小时, 生成一个OnDiskMapOutput实例。2.如果没有超过此大小,生成一个InMemoryMapOutput实例。failedTasks = copyMapOutput(host, input, remaining);}在copyMapOutput函数中首先调用的MergeManagerImpl.reserve函数:if (!canShuffleToMemory(requestedSize)) {.....returnnewOnDiskMapOutput < K, V > (mapId, reduceId, this, requestedSize, jobConf, mapOutputFile, fetcher, true);}.....if (usedMemory > memoryLimit) {....., 当前使用的memory已经超过了配置的内存使用大小,此时返回null,把host重新添加到shuffleScheduler的pendingHosts队列中。returnnull;}returnunconditionalReserve(mapId, requestedSize, true);生成一个InMemoryMapOutput,并把usedMemory加上此mapoutput的大小。privatesynchronizedInMemoryMapOutput < K,V > unconditionalReserve(TaskAttemptID mapId, longrequestedSize, booleanprimaryMapOutput) { usedMemory += requestedSize; returnnewInMemoryMapOutput < K, V > (jobConf, mapId, this, (int) requestedSize, codec, primaryMapOutput);}
下面是当usedMemory使用超过了指定的大小后,的处理部分,重新把host添加到队列中。如下所示:copyMapOutput函数if (mapOutput == null) { LOG.info("fetcher#" + id + "- MergeManager returned status WAIT ..."); //Notan error but wait to process data. returnEMPTY_ATTEMPT_ID_ARRAY;}此时host中还有没处理完成的mapoutput,在Fetcher.run中,重新添加到队列中把此hostif (host != null) { scheduler.freeHost(host); metrics.threadFree();}.........接下来还是在copyMapOutput函数中,通过mapoutput也就是merge.reserve函数返回的实例的shuffle函数。如果mapoutput是InMemoryMapOutput,在调用shuffle时,直接把map输出写入到内存。如果是OnDiskMapOutput,在调用shuffle时,直接把map的输出写入到local临时文件中。....最后,执行ShuffleScheduler.copySucceeded完成文件的copy,调用mapout.commit函数。scheduler.copySucceeded(mapId, host, compressedLength, endTime - startTime, mapOutput);并从remaining中移出处理过的mapid,
接下来看看MapOutput.commit函数:a.InMemoryMapOutput.commit函数:publicvoidcommit() throwsIOException { merger.closeInMemoryFile(this);}调用MergeManagerImpl.closeInMemoryFile函数: publicsynchronizedvoidcloseInMemoryFile(InMemoryMapOutput < K, V > mapOutput) {把此mapOutput实例添加到inMemoryMapOutputs列表中。inMemoryMapOutputs.add(mapOutput); LOG.info("closeInMemoryFile-> map-output of size: " + mapOutput.getSize() + ",inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size() + ",commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory);把commitMemory的大小增加当前传入的mapoutput的size大小。commitMemory += mapOutput.getSize();检查是否达到merge的值,此值是mapreduce.reduce.memory.totalbytes配置 * mapreduce.reduce.shuffle.merge.percent配置的值,默认是当前Runtime的memory * 0.90 * 0.90也就是说,只有有新的mapoutput加入,这个检查条件就肯定会达到 //Can hang if mergeThreshold is really low. if (commitMemory >= mergeThreshold) {.......把正在进行merge的mapoutput列表添加到一起发起merge操作。inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs); inMemoryMergedMapOutputs.clear(); inMemoryMerger.startMerge(inMemoryMapOutputs); commitMemory = 0L; // Reset commitMemory. }如果mapreduce.reduce.merge.memtomem.enabled配置为true, 默认为false同时inMemoryMapOutputs中的mapoutput个数达到了mapreduce.reduce.merge.memtomem.threshold配置的值,默认值是mapreduce.task.io.sort.factor配置的值,默认为100发起memTomem的merger操作。 if (memToMemMerger != null) { if (inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) { memToMemMerger.startMerge(inMemoryMapOutputs); } }}
MergemanagerImpl.InMemoryMerger.merger函数操作:在执行inMemoryMerger.startMerge(inMemoryMapOutputs);操作后,会notify此线程,同时执行merger函数:publicvoidmerge(List < InMemoryMapOutput < K, V >> inputs) throwsIOException { if (inputs == null || inputs.size() == 0) { return; }....................TaskAttemptID mapId = inputs.get(0).getMapId(); TaskID mapTaskId = mapId.getTaskID();
List < Segment < K, V >> inMemorySegments = newArrayList < Segment < K, V >> ();生成InMemoryReader实例,并把传入的容器清空,把生成好后的segment放到到inmemorysegments中。longmergeOutputSize = createInMemorySegments(inputs, inMemorySegments, 0); intnoInMemorySegments = inMemorySegments.size();生成一个输出的文件路径,Path outputPath = mapOutputFile.getInputFileForWrite(mapTaskId, mergeOutputSize).suffix(Task.MERGED_OUTPUT_PREFIX);针对输出的临时文件生成一个Write实例。Writer < K, V > writer = newWriter < K, V > (jobConf, rfs, outputPath, (Class < K > ) jobConf.getMapOutputKeyClass(), (Class < V > ) jobConf.getMapOutputValueClass(), codec, null);
RawKeyValueIterator rIter = null; CompressAwarePathcompressAwarePath; try { LOG.info("Initiatingin-memory merge with " + noInMemorySegments + "segments...");此部分与map端的输出没什么区别,得到几个segment的文件的一个iterator, 此部分是一个优先堆,每一次next都会从所有的segment中读取出最小的一个key与value rIter = Merger.merge(jobConf, rfs, (Class < K > ) jobConf.getMapOutputKeyClass(), (Class < V > ) jobConf.getMapOutputValueClass(), inMemorySegments, inMemorySegments.size(), newPath(reduceId.toString()), (RawComparator < K > ) jobConf.getOutputKeyComparator(), reporter, spilledRecordsCounter, null, null);如果没有combiner程序,直接写入到文件,否则,如果有combiner,先执行combiner处理。 if (null == combinerClass) { Merger.writeFile(rIter, writer, reporter, jobConf); } else { combineCollector.setWriter(writer); combineAndSpill(rIter, reduceCombineInputCounter); } writer.close();此处与map端的输出不同的地方在这里,这里不写入spillindex文件,而是生成一个CompressAwarePath,把输出路径, 大小写入到此实例中。compressAwarePath = newCompressAwarePath(outputPath, writer.getRawLength(), writer.getCompressedLength());
LOG.info(reduceId + "Merge of the " + noInMemorySegments + "files in-memory complete." + "Local file is " + outputPath + "of size " + localFS.getFileStatus(outputPath).getLen()); } catch(IOException e) { //makesure that we delete the ondiskfile that we created //earlierwhen we invoked cloneFileAttributes localFS.delete(outputPath, true); throwe; }此处,把生成的文件添加到onDiskMapOutputs属性中,并检查此容器中的文件是否达到了mapreduce.task.io.sort.factor配置的值,如果是,发起disk的merger操作。 //Note the output of the merge closeOnDiskFile(compressAwarePath);}
}上面最后一行的全部定义在下面这里。publicsynchronizedvoidcloseOnDiskFile(CompressAwarePath file) { onDiskMapOutputs.add(file); if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) { onDiskMerger.startMerge(onDiskMapOutputs); }}
b.OnDiskMapOutput.commit函数:把tmp文件rename到指定的目录下,生成一个CompressAwarePath实例,调用上面提到的处理程序。publicvoidcommit() throwsIOException { fs.rename(tmpOutputPath, outputPath); CompressAwarePathcompressAwarePath = newCompressAwarePath(outputPath, getSize(), this.compressedSize); merger.closeOnDiskFile(compressAwarePath);}
MergeManagerImpl.OnDiskMerger.merger函数:这个函数到现在基本上没有什么可以解说的东西,注意一点就是,每merge一个文件后,会把这个merge后的文件路径重新添加到onDiskMapOutputs容器中。publicvoidmerge(List < CompressAwarePath > inputs) throwsIOException { //sanity check if (inputs == null || inputs.isEmpty()) { LOG.info("Noondisk files to merge..."); return; } longapproxOutputSize = 0; intbytesPerSum = jobConf.getInt("io.bytes.per.checksum", 512); LOG.info("OnDiskMerger:We have " + inputs.size() + "map outputs on disk. Triggering merge..."); //1. Prepare the list of files to be merged. for (CompressAwarePath file: inputs) { approxOutputSize += localFS.getFileStatus(file).getLen(); }
//add the checksum length approxOutputSize += ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
//2. Start the on-disk merge process Path outputPath = localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX); Writer < K, V > writer = newWriter < K, V > (jobConf, rfs, outputPath, (Class < K > ) jobConf.getMapOutputKeyClass(), (Class < V > ) jobConf.getMapOutputValueClass(), codec, null); RawKeyValueIterator iter = null; CompressAwarePathcompressAwarePath; Path tmpDir = newPath(reduceId.toString()); try { iter = Merger.merge(jobConf, rfs, (Class < K > ) jobConf.getMapOutputKeyClass(), (Class < V > ) jobConf.getMapOutputValueClass(), codec, inputs.toArray(newPath[inputs.size()]), true, ioSortFactor, tmpDir, (RawComparator < K > ) jobConf.getOutputKeyComparator(), reporter, spilledRecordsCounter, null, mergedMapOutputsCounter, null);
Merger.writeFile(iter, writer, reporter, jobConf); writer.close(); compressAwarePath = newCompressAwarePath(outputPath, writer.getRawLength(), writer.getCompressedLength()); } catch(IOException e) { localFS.delete(outputPath, true); throwe; }
closeOnDiskFile(compressAwarePath);
LOG.info(reduceId + "Finished merging " + inputs.size() + "map output files on disk of total-size " + approxOutputSize + "." + "Local output file is " + outputPath + " of size " + localFS.getFileStatus(outputPath).getLen());}}
ok,现在map的copy部分执行完成,回到ShuffleConsumerPlugin的run方法中,也就是Shuffle的run方法中,接着上面的代码向下分析:此处等待所有的copy操作完成,//Wait for shuffle to complete successfullywhile (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) { reporter.progress(); synchronized(this) { if (throwable != null) { thrownewShuffleError("error in shuffle in " + throwingThreadName, throwable); } }}如果执行到这一行时,说明所有的mapcopy操作已经完成,关闭查找map运行状态的线程与执行copy操作的几个线程。//Stop the event-fetcher threadeventFetcher.shutDown();//Stop the map-output fetcher threadsfor (Fetcher < K, V > fetcher: fetchers) { fetcher.shutDown();}//stop the schedulerscheduler.close();发am发送状态,通知AM,此时要执行排序操作。copyPhase.complete(); // copy is already completetaskStatus.setPhase(TaskStatus.Phase.SORT);reduceTask.statusUpdate(umbilical);
执行最后的merge, 其实在合并所有文件与memory中的数据时,也同时会进行排序操作。//Finish the on-going merges...RawKeyValueIterator kvIter = null;try { kvIter = merger.close();} catch(Throwable e) { thrownewShuffleError("Error while doingfinal merge ", e);}
//Sanity checksynchronized(this) { if (throwable != null) { thrownewShuffleError("error in shuffle in " + throwingThreadName, throwable); }}最后返回这个合并后的iterator实例。returnkvIter;
Merger也就是MergeManagerImpl.close函数:publicRawKeyValueIterator close() throwsThrowable {关闭几个merge的线程,在关闭时会等待现有的merge完成。 //Wait for on-going merges to complete if (memToMemMerger != null) { memToMemMerger.close(); } inMemoryMerger.close(); onDiskMerger.close(); List < InMemoryMapOutput < K, V >> memory = newArrayList < InMemoryMapOutput < K, V >> (inMemoryMergedMapOutputs); inMemoryMergedMapOutputs.clear(); memory.addAll(inMemoryMapOutputs); inMemoryMapOutputs.clear(); List < CompressAwarePath > disk = newArrayList < CompressAwarePath > (onDiskMapOutputs); onDiskMapOutputs.clear();执行最终的merge操作。returnfinalMerge(jobConf, rfs, memory, disk);}最后的一个merge操作privateRawKeyValueIterator finalMerge(JobConf job, FileSystem fs, List < InMemoryMapOutput < K, V >> inMemoryMapOutputs, List < CompressAwarePath > onDiskMapOutputs) throwsIOException { LOG.info("finalMergecalled with " + inMemoryMapOutputs.size() + " in-memory map-outputs and " + onDiskMapOutputs.size() + "on-disk map-outputs"); finalfloatmaxRedPer = job.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f); if (maxRedPer > 1.0 || maxRedPer < 0.0) { thrownewIOException(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT + maxRedPer); }得到可以cache到内存的大小, 比例通过mapreduce.reduce.input.buffer.percent配置,intmaxInMemReduce = (int) Math.min(Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE);
//merge configparams Class < K > keyClass = (Class < K > ) job.getMapOutputKeyClass(); Class < V > valueClass = (Class < V > ) job.getMapOutputValueClass(); booleankeepInputs = job.getKeepFailedTaskFiles(); finalPath tmpDir = newPath(reduceId.toString()); finalRawComparator < K > comparator = (RawComparator < K > ) job.getOutputKeyComparator();
//segments required to vacate memory List < Segment < K, V >> memDiskSegments = newArrayList < Segment < K, V >> (); longinMemToDiskBytes = 0; booleanmergePhaseFinished = false; if (inMemoryMapOutputs.size() > 0) { TaskID mapId = inMemoryMapOutputs.get(0).getMapId().getTaskID();这个地方根据可cache到内存的值,把不能cache到内存的部分生成InMemoryReader实例,并添加到memDiskSegments容器中。inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs, memDiskSegments, maxInMemReduce); finalintnumMemDiskSegments = memDiskSegments.size();把内存中多于部分的mapoutput数据写入到文件中,并把文件路径添加到onDiskMapOutputs容器中。 if (numMemDiskSegments > 0 && ioSortFactor > onDiskMapOutputs.size()) {...........此部分主要是写入内存中多于的mapoutput到磁盘中去mergePhaseFinished = true; //must spill to disk, but can't retain in-memfor intermediate merge finalPath outputPath = mapOutputFile.getInputFileForWrite(mapId, inMemToDiskBytes).suffix(Task.MERGED_OUTPUT_PREFIX); finalRawKeyValueIterator rIter = Merger.merge(job, fs, keyClass, valueClass, memDiskSegments, numMemDiskSegments, tmpDir, comparator, reporter, spilledRecordsCounter, null, mergePhase); Writer < K, V > writer = newWriter < K, V > (job, fs, outputPath, keyClass, valueClass, codec, null); try { Merger.writeFile(rIter, writer, reporter, job); writer.close(); onDiskMapOutputs.add(newCompressAwarePath(outputPath, writer.getRawLength(), writer.getCompressedLength())); writer = null; //add to list of final disk outputs. } catch(IOException e) { if (null != outputPath) { try { fs.delete(outputPath, true); } catch(IOException ie) { //NOTHING } } throwe; } finally { if (null != writer) { writer.close(); } } LOG.info("Merged" + numMemDiskSegments + "segments, " + inMemToDiskBytes + "bytes to disk to satisfy " + "reducememory limit"); inMemToDiskBytes = 0; memDiskSegments.clear(); } elseif(inMemToDiskBytes != 0) { LOG.info("Keeping" + numMemDiskSegments + "segments, " + inMemToDiskBytes + "bytes in memory for " + "intermediate,on-disk merge"); } }
//segments on disk List < Segment < K, V >> diskSegments = newArrayList < Segment < K, V >> (); longonDiskBytes = inMemToDiskBytes; longrawBytes = inMemToDiskBytes;生成目前文件中有的所有的mapoutput路径的onDisk数组CompressAwarePath[] onDisk = onDiskMapOutputs.toArray(newCompressAwarePath[onDiskMapOutputs.size()]); for (CompressAwarePath file: onDisk) { longfileLength = fs.getFileStatus(file).getLen(); onDiskBytes += fileLength; rawBytes += (file.getRawDataLength() > 0) ? file.getRawDataLength() : fileLength;
LOG.debug("Diskfile: " + file + "Length is " + fileLength);把现在reduce端接收过来并存储到文件中的mapoutput生成segment并添加到distSegments容器中diskSegments.add(newSegment < K, V > (job, fs, file, codec, keepInputs, (file.toString().endsWith(Task.MERGED_OUTPUT_PREFIX) ? null: mergedMapOutputsCounter), file.getRawDataLength())); } LOG.info("Merging" + onDisk.length + " files, " + onDiskBytes + "bytes from disk");按内容的大小从小到大排序此distSegments容器Collections.sort(diskSegments, newComparator < Segment < K, V >> () { publicintcompare(Segment < K, V > o1, Segment < K, V > o2) { if (o1.getLength() == o2.getLength()) { return0; } returno1.getLength() < o2.getLength() ? -1 : 1; } });把现在memory中所有的mapoutput内容生成segment并添加到finalSegments容器中。 //build final list of segments from merged backed by disk + in-mem List < Segment < K, V >> finalSegments = newArrayList < Segment < K, V >> (); longinMemBytes = createInMemorySegments(inMemoryMapOutputs, finalSegments, 0); LOG.info("Merging" + finalSegments.size() + "segments, " + inMemBytes + "bytes from memory into reduce"); if (0 != onDiskBytes) { finalintnumInMemSegments = memDiskSegments.size(); diskSegments.addAll(0, memDiskSegments); memDiskSegments.clear(); //Pass mergePhase only if there is a going to be intermediate //merges. See comment where mergePhaseFinished is being set Progress thisPhase = (mergePhaseFinished) ? null: mergePhase;这个部分是把现在磁盘上的mapoutput生成一个iterator, RawKeyValueIterator diskMerge = Merger.merge(job, fs, keyClass, valueClass, codec, diskSegments, ioSortFactor, numInMemSegments, tmpDir, comparator, reporter, false, spilledRecordsCounter, null, thisPhase); diskSegments.clear(); if (0 == finalSegments.size()) { returndiskMerge; }把现在磁盘上的iterator也同样添加到finalSegments容器中,也就是此时,这个容器中有两个优先堆排序的队列,每next一次,要从内存与磁盘中找出最小的一个kv.finalSegments.add(newSegment < K, V > (newRawKVIteratorReader(diskMerge, onDiskBytes), true, rawBytes)); } returnMerger.merge(job, fs, keyClass, valueClass, finalSegments, finalSegments.size(), tmpDir, comparator, reporter, spilledRecordsCounter, null, null);}
shuffle部分现在全部执行完成,重新加到ReduceTask.run函数中,接着代码向下分析:rIter = shuffleConsumerPlugin.run();............RawComparatorcomparator = job.getOutputValueGroupingComparator();if (useNewApi) { runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);} else { runOldReducer........}在以上代码中执行runNewReducer主要是执行reduce的run函数,org.apache.hadoop.mapreduce.TaskAttemptContexttaskContext = neworg.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(), reporter);//make a reducerorg.apache.hadoop.mapreduce.Reducer < INKEY, INVALUE, OUTKEY, OUTVALUE > reducer = (org.apache.hadoop.mapreduce.Reducer < INKEY, INVALUE, OUTKEY, OUTVALUE > ) ReflectionUtils.newInstance(taskContext.getReducerClass(), job);org.apache.hadoop.mapreduce.RecordWriter < OUTKEY, OUTVALUE > trackedRW = newNewTrackingRecordWriter < OUTKEY, OUTVALUE > (this, taskContext);job.setBoolean("mapred.skip.on", isSkipping());job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, getTaskID(), rIter, reduceInputKeyCounter, reduceInputValueCounter, trackedRW, committer, reporter, comparator, keyClass, valueClass);try { reducer.run(reducerContext);} finally { trackedRW.close(reducerContext);}
以上代码中创建Reducer运行的Context, 并执行reducer.run函数:createReduceContext函数定义部分代码:org.apache.hadoop.mapreduce.ReduceContext < INKEY, INVALUE, OUTKEY, OUTVALUE > reduceContext = newReduceContextImpl < INKEY, INVALUE, OUTKEY, OUTVALUE > (job, taskId, rIter, inputKeyCounter, inputValueCounter, output, committer, reporter, comparator, keyClass, valueClass);
org.apache.hadoop.mapreduce.Reducer < INKEY, INVALUE, OUTKEY, OUTVALUE > .Context reducerContext = newWrappedReducer < INKEY, INVALUE, OUTKEY, OUTVALUE > ().getReducerContext(reduceContext);ReduceContextImpl主要是执行在RawKeyValueInterator中读取数据的相关操作。Reducer.run函数:publicvoid run(Context context) throwsIOException, InterruptedException { setup(context); try { while (context.nextKey()) { reduce(context.getCurrentKey(), context.getValues(), context); //If a back up store is used, reset it Iterator < VALUEIN > iter = context.getValues().iterator(); if (iterinstanceofReduceContext.ValueIterator) { ((ReduceContext.ValueIterator < VALUEIN > ) iter).resetBackupStore(); } } } finally { cleanup(context); }}在run函数中通过context.nextkey来得到下一行的数据,这部分主要在ReduceContextImpl中完成:nextkey调用nextKeyValue函数:publicboolean nextKeyValue() throwsIOException, InterruptedException { if (!hasMore) { key = null; value = null; returnfalse; }此处用来检查是否是一个key下面的第一个value, 如果是第一个value时,此值为false, 也就是说,nextKeyIsSame的值是true时,表示现在next的数据与current的key是一行数据。否则表示已经进行了换行操作。firstValue = !nextKeyIsSame;执行一下RawKeyValueInterator(也就是Merge中的队列),得到当前最小的key DataInputBuffer nextKey = input.getKey();把key设置到buffer中,设置到buffer中的目的是为了通过keyDeserializer来读取一个key的值。currentRawKey.set(nextKey.getData(), nextKey.getPosition(), nextKey.getLength() - nextKey.getPosition()); buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());从buffer中读取key的值,并存储到key中,这个地方要注意一下,下面先看看这部分的定义:.........................生成一个key的Deserializer实例,this.keyDeserializer = serializationFactory.getDeserializer(keyClass);把buffer当成keyDeserializer的InputStream。this.keyDeserializer.open(buffer); Deserializer中执行deserializer函数的定义:此部分定义可以看出,一个key / value只会生成实例,此部分从性能上考虑主要是为了减少对象的生成。每次生成一个数据时,都是通过readFields重新去生成Writable实例中的内容,因此,很多同学在reduce中使用value时,会出现数据引用不对的情况,因为对象还是同一个对象,但值是最后一个,所以会出现数据不对的情况publicWritable deserialize(Writable w) throwsIOException { Writable writable; if (w == null) { writable = (Writable) ReflectionUtils.newInstance(writableClass, getConf()); } else { writable = w; } writable.readFields(dataIn); returnwritable; }.........................读取key的内容key = keyDeserializer.deserialize(key);按key相同的方式,得到当前的value的值,DataInputBuffer nextVal = input.getValue(); buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength() - nextVal.getPosition()); value = valueDeserializer.deserialize(value);
currentKeyLength = nextKey.getLength() - nextKey.getPosition(); currentValueLength = nextVal.getLength() - nextVal.getPosition();
isMarked的值为false, 同时backupStore属性为null if (isMarked) { backupStore.write(nextKey, nextVal); }把input执行一次next操作,此处会从所有的文件 / memory中找到最小的一个kv.hasMore = input.next(); if (hasMore) {比较一下,是否与currentkey是同一个key, 如果是表示在同一行中。也就是key相同。nextKey = input.getKey(); nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, currentRawKey.getLength(), nextKey.getData(), nextKey.getPosition(), nextKey.getLength() - nextKey.getPosition()) == 0; } else { nextKeyIsSame = false; } inputValueCounter.increment(1); returntrue;}
接下来是调用reduce函数,此时会通过context.getValues函数把key对应的所有的value传给reduce.此处的context.getValues如下所示:ReduceContextImpl.getValues() public Iterable < VALUEIN > getValues() throwsIOException, InterruptedException { returniterable;}以上代码中直接返回的是iterable的实例,此实例在ReduceContextImpl实例生成时生成。privateValueIterable iterable = newValueIterable();这个类是ReduceContextImpl中的内部类protectedclass ValueIterable implementsIterable < VALUEIN > { privateValueIterator iterator = newValueIterator();@Override publicIterator < VALUEIN > iterator() { returniterator; }}此实例中引用一个ValueIterator类,这也是一个内部类。每次进行执行时,通过此ValueIterator.next来获取一条数据,publicVALUEIN next() { inReset的值默认为false.也就是说inReset检查内部的代码不会执行,其实backupStore本身值就是null如果想使用backupStore, 需要执行其内部的make函数。 if (inReset) {.................里面的代码不分析 }如果是key下面的第一个value, 把firstValue设置为false, 因为下一次来时,就不是firstValue了.返回当前的value //if this is the first record, we don't need to advance if (firstValue) { firstValue = false; returnvalue; } //if this isn't the first record and the next key is different, they //can't advance it here. if (!nextKeyIsSame) { thrownewNoSuchElementException("iteratepast last value"); } //otherwise, go to the next key/value pair try {这里表示不是第一个value的时候,也就是firstValue的值为false, 执行一下nextKeyValue函数,得到当前的value.返回。nextKeyValue(); returnvalue; } catch(IOException ie) { thrownewRuntimeException("next valueiterator failed", ie); } catch(InterruptedException ie) { //this is bad, but we can't modify the exception list of java.util thrownewRuntimeException("next valueiterator interrupted", ie); }}