前言
在Hadoop Job的各个运行过程中,Shuffle阶段一直是一个比较神秘的过程.因为Shuffle阶段是隶属于Reduce过程的子过程,所以很多时候会被人所忽略.但是Shffle的整个过程在map reduce的整个过程中起到1个数据过渡的作用.正因为这个模块的重要性,Hadoop把这个模块设置成了可插拔的模块,用户可以根据自己应用的类型特点,定制自己的Shuffle模块代码.之前粗粗的阅读了一下相关的代码,于是写一些内容记录一下所学的.
Shuffle过程
Shuffle过程是Reduce阶段的初始操作阶段,过程简单的理解就是"远程数据拷贝"的过程,拷贝的目标数据源就是map的中间输出结果.reduce过程要想进一步进行处理操作,首先必须要做的就是拿到这批数据.一般map的中间结果文件是写出在当前的Task运行的节点上,所以reduce task拷贝数据会经过走网络的过程.而且如果这其中的量比较大的话,会消耗掉一定的网络带宽.
Shuffle模块源码分析
下面从源代码层面浅析此模块部分的代码,首先这个阶段是属于Reduce的过程中的,所以定位到Reduce Task的代码上.
@Override
@SuppressWarnings("unchecked")
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException, ClassNotFoundException {
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
if (isMapOrReduce()) {
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");
}
....
// Initialize the codec
codec = initCodec();
RawKeyValueIterator rIter = null;
ShuffleConsumerPlugin shuffleConsumerPlugin = null;
Class combinerClass = conf.getCombinerClass();
CombineOutputCollector combineCollector =
(null != combinerClass) ?
new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
Class<? extends ShuffleConsumerPlugin> clazz =
job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
ShuffleConsumerPlugin.Context shuffleContext =
new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,
super.lDirAlloc, reporter, codec,
combinerClass, combineCollector,
spilledRecordsCounter, reduceCombineInputCounter,
shuffledMapsCounter,
reduceShuffleBytes, failedShuffleCounter,
mergedMapOutputsCounter,
taskStatus, copyPhase, sortPhase, this,
mapOutputFile, localMapFiles);
shuffleConsumerPlugin.init(shuffleContext);
rIter = shuffleConsumerPlugin.run();
....
在Reduce Task中的run方法中能够看到shuffle部分的代码.首先是狗仔Shuffle上下文,然后是初始化操作,然后执行shuffle主操作.首先来看shuffle的上下文构造过程,他是一个内部类,在构造的过程中,传入了大量的变量参数,这些变量参数在Shuffle的过程中会被用到,下面是context中的变量定义,这些参数会由外部reduce task的参数传入到context上下文类中:@InterfaceAudience.LimitedPrivate("mapreduce")
@InterfaceStability.Unstable
public static class Context<K,V> {
private final org.apache.hadoop.mapreduce.TaskAttemptID reduceId;
private final JobConf jobConf;
private final FileSystem localFS;
private final TaskUmbilicalProtocol umbilical;
private final LocalDirAllocator localDirAllocator;
private final Reporter reporter;
private final CompressionCodec codec;
private final Class<? extends Reducer> combinerClass;
private final CombineOutputCollector<K, V> combineCollector;
//与Shuffle过程相关计数器
private final Counters.Counter spilledRecordsCounter;
private final Counters.Counter reduceCombineInputCounter;
private final Counters.Counter shuffledMapsCounter;
private final Counters.Counter reduceShuffleBytes;
private final Counters.Counter failedShuffleCounter;
private final Counters.Counter mergedMapOutputsCounter;
private final TaskStatus status;
private final Progress copyPhase;
private final Progress mergePhase;
private final Task reduceTask;
private final MapOutputFile mapOutputFile;
private final Map<TaskAttemptID, MapOutputFile> localMapFiles;
然后构造完成上下文后,进行init操作,就是下面这行代码:shuffleConsumerPlugin.init(shuffleContext);
shuffleConsumerPlugin是接口类,所以要找到具体的实现类,在Hadoop-2.7.1中,是名为Shuffle类.下面是进行的初始化操作@InterfaceAudience.LimitedPrivate({"MapReduce"})
@InterfaceStability.Unstable
@SuppressWarnings({"unchecked", "rawtypes"})
public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionReporter {
private static final int PROGRESS_FREQUENCY = 2000;
private static final int MAX_EVENTS_TO_FETCH = 10000;
private static final int MIN_EVENTS_TO_FETCH = 100;
private static final int MAX_RPC_OUTSTANDING_EVENTS = 3000000;
private ShuffleConsumerPlugin.Context context;
.....
@Override
public void init(ShuffleConsumerPlugin.Context context) {
this.context = context;
this.reduceId = context.getReduceId();
this.jobConf = context.getJobConf();
this.umbilical = context.getUmbilical();
this.reporter = context.getReporter();
this.metrics = new ShuffleClientMetrics(reduceId, jobConf);
this.copyPhase = context.getCopyPhase();
this.taskStatus = context.getStatus();
this.reduceTask = context.getReduceTask();
this.localMapFiles = context.getLocalMapFiles();
scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId,
this, copyPhase, context.getShuffledMapsCounter(),
context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
merger = createMergeManager(context);
}
将之前上下文中的参数变量值赋值到自己的内部变量中.还需要关注一下,代码最后一行merge操作类.merge操作会在shuffle阶段尾声阶段进行.下面是执行主方法:rIter = shuffleConsumerPlugin.run();
跳到具体的实现方法中:@Override
public RawKeyValueIterator run() throws IOException, InterruptedException {
......
// Start the map-completion events fetcher thread
final EventFetcher<K,V> eventFetcher =
new EventFetcher<K,V>(reduceId, umbilical, scheduler, this,
maxEventsToFetch);
eventFetcher.start();
// Start the map-output fetcher threads
boolean isLocal = localMapFiles != null;
final int numFetchers = isLocal ? 1 :
jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
if (isLocal) {
fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
localMapFiles);
fetchers[0].start();
} else {
for (int i=0; i < numFetchers; ++i) {
fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
reduceTask.getShuffleSecret());
fetchers[i].start();
}
}
......
在Shuffle的主操作中可以看到,首先会根据map输出结果是否具有本地性,如果是在本地的,传入mapFile文件地址,然后都会新建若干个fetcher线程,来远程抓取数据.所以这里的核心操作应该是在Fetcher类中实现的.在Fetcher的start方法会执行拷贝的主要操作.public void run() {
try {
while (!stopped && !Thread.currentThread().isInterrupted()) {
MapHost host = null;
try {
// If merge is on, block
merger.waitForResource();
// Get a host to shuffle from
host = scheduler.getHost();
metrics.threadBusy();
// Shuffle
copyFromHost(host);
} finally {
if (host != null) {
scheduler.freeHost(host);
metrics.threadFree();
}
}
}
} catch (InterruptedException ie) {
return;
} catch (Throwable t) {
exceptionReporter.reportException(t);
}
}
进入copyFromHost方法,在方法的其实部分,先获取目标已经结束运行的map task列表. @VisibleForTesting
protected void copyFromHost(MapHost host) throws IOException {
// reset retryStartTime for a new host
retryStartTime = 0;
// Get completed maps on 'host'
List<TaskAttemptID> maps = scheduler.getMapsForHost(host);
....
也就是说,后面的Shuffle阶段就会从这些map task运行所在的节点上进行fetch data的操作.在拷贝操作之前,维护一个remaining剩余变量操作// List of maps to be fetched yet
Set<TaskAttemptID> remaining = new HashSet<TaskAttemptID>(maps);
接着首先根据变量获取输入流数据,判断是否在map task的host上是否真正存在数据 // Construct the url and connect
URL url = getMapOutputURL(host, maps);
DataInputStream input = openShuffleUrl(host, remaining, url);
if (input == null) {
return;
}
然后后面的操作进行循环的拷贝读取TaskAttemptID[] failedTasks = null;
while (!remaining.isEmpty() && failedTasks == null) {
try {
failedTasks = copyMapOutput(host, input, remaining, fetchRetryEnabled);
} catch (IOException e) {
//
// Setup connection again if disconnected by NM
connection.disconnect();
// Get map output from remaining tasks only.
url = getMapOutputURL(host, remaining);
input = openShuffleUrl(host, remaining, url);
if (input == null) {
return;
}
}
}
构造url,获得输入流数据,如果出现失败任务则就会退出循环,正常情况下,remain map数为空了,循环自然会退出.这里的拷贝操作细节又来到了copyMapOutput中.在拷贝操作之前,会进行拷贝总大小的计算,从输入流中读取.
private TaskAttemptID[] copyMapOutput(MapHost host,
DataInputStream input,
Set<TaskAttemptID> remaining,
boolean canRetry) throws IOException {
MapOutput<K,V> mapOutput = null;
TaskAttemptID mapId = null;
long decompressedLength = -1;
long compressedLength = -1;
try {
long startTime = Time.monotonicNow();
int forReduce = -1;
//Read the shuffle header
try {
ShuffleHeader header = new ShuffleHeader();
header.readFields(input);
mapId = TaskAttemptID.forName(header.mapId);
compressedLength = header.compressedLength;
decompressedLength = header.uncompressedLength;
forReduce = header.forReduce;
.....
然后会根据计算过的拷贝数据量的大小,判断将数据拷贝到内存中还是磁盘中,然后返回相应的输出对象.// Do some basic sanity verification
if (!verifySanity(compressedLength, decompressedLength, forReduce,
remaining, mapId)) {
return new TaskAttemptID[] {mapId};
}
if(LOG.isDebugEnabled()) {
LOG.debug("header: " + mapId + ", len: " + compressedLength +
", decomp len: " + decompressedLength);
}
// Get the location for the map output - either in-memory or on-disk
try {
mapOutput = merger.reserve(mapId, decompressedLength, id);
....
reserve方法就是主要进行判断操作的@Override
public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId,
long requestedSize,
int fetcher
) throws IOException {
if (!canShuffleToMemory(requestedSize)) {
LOG.info(mapId + ": Shuffling to disk since " + requestedSize +
" is greater than maxSingleShuffleLimit (" +
maxSingleShuffleLimit + ")");
return new OnDiskMapOutput<K,V>(mapId, reduceId, this, requestedSize,
jobConf, mapOutputFile, fetcher, true);
}
.....
if (usedMemory > memoryLimit) {
LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory
+ ") is greater than memoryLimit (" + memoryLimit + ")." +
" CommitMemory is (" + commitMemory + ")");
return null;
}
// Allow the in-memory shuffle to progress
LOG.debug(mapId + ": Proceeding with shuffle since usedMemory ("
+ usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
+ "CommitMemory is (" + commitMemory + ")");
return unconditionalReserve(mapId, requestedSize, true);
}
/**
* Unconditional Reserve is used by the Memory-to-Memory thread
* @return
*/
private synchronized InMemoryMapOutput<K, V> unconditionalReserve(
TaskAttemptID mapId, long requestedSize, boolean primaryMapOutput) {
usedMemory += requestedSize;
return new InMemoryMapOutput<K,V>(jobConf, mapId, this, (int)requestedSize,
codec, primaryMapOutput);
}
返回的对象有2种,1种是拷贝到内存中InMemoryMapOutput,还有1种是磁盘上的,OnDiskMapOutput.目标确定之后,进行Shuffle远程拷贝操作 ....
// The codec for lz0,lz4,snappy,bz2,etc. throw java.lang.InternalError
// on decompression failures. Catching and re-throwing as IOException
// to allow fetch failure logic to be processed
try {
// Go!
LOG.info("fetcher#" + id + " about to shuffle output of map "
+ mapOutput.getMapId() + " decomp: " + decompressedLength
+ " len: " + compressedLength + " to " + mapOutput.getDescription());
mapOutput.shuffle(host, is, compressedLength, decompressedLength,
metrics, reporter);
} catch (java.lang.InternalError e) {
LOG.warn("Failed to shuffle for fetcher#"+id, e);
throw new IOException(e);
}
最后在这些操作完成之后,为了防止内存中的空间被占用过大,或者磁盘中的小文件数太多,会进行一次merge和并操作,在最后的Shuffle类的merge.close()方法中会调用.// Start the map-output fetcher threads
boolean isLocal = localMapFiles != null;
final int numFetchers = isLocal ? 1 :
jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
if (isLocal) {
fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
localMapFiles);
fetchers[0].start();
} else {
for (int i=0; i < numFetchers; ++i) {
fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
reduceTask.getShuffleSecret());
fetchers[i].start();
}
}
.....
// Finish the on-going merges...
RawKeyValueIterator kvIter = null;
try {
kvIter = merger.close();
} catch (Throwable e) {
throw new ShuffleError("Error while doing final merge " , e);
}
@Override
public RawKeyValueIterator close() throws Throwable {
// Wait for on-going merges to complete
if (memToMemMerger != null) {
memToMemMerger.close();
}
inMemoryMerger.close();
onDiskMerger.close();
List<InMemoryMapOutput<K, V>> memory =
new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);
inMemoryMergedMapOutputs.clear();
memory.addAll(inMemoryMapOutputs);
inMemoryMapOutputs.clear();
List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);
onDiskMapOutputs.clear();
return finalMerge(jobConf, rfs, memory, disk);
}
OK,以上操作的结束,就是整个Reduce Shuffle的过程操作.下面是一张简易的流程分析图其他方面代码的分析请点击链接https://github.com/linyiqun/hadoop-yarn,后续将会继续更新YARN其他方面的代码分析。
参考源码
Apach-hadoop-2.7.1(hadoop-hdfs-project)