花了大约1周的时间,最终把MapReduce的5大阶段的源代码学习结束掉了。收获不少。就算本人对Hadoop学习的一个里程碑式的纪念吧。今天花了一点点的时间,把MapReduce的最后一个阶段。输出OutputFormat给做了分析,这个过程跟InputFormat刚刚好是对着干的,二者极具对称性。为什么这么说呢,待我一一分析。
OutputFormat过程的作用就是定义数据key-value的输出格式,给你处理好后的数据,到底以什么样的形式输出呢。才干让下次别人拿到这个文件的时候能准确的提取出里面的数据。这里,我们撇开这个话题,只我知道的一些定义的数据格式的方法,比方在Redis中会有这种设计:
[key-length][key][value-length][value][key-length][key][value-length][value]...
或者说不一定非要省空间,直接搞过分隔符
[key] [value]
[key] [value]
[key] [value]
.....
这样逐行读取,再以空格隔开,取出里面的键值对。这么做简单是简单。就是不紧凑,空间浪费得有点多。在MapReduce的OutputFormat的有种格式用的就是这样的方式。
首先必须得了解OutputFormat里面究竟有什么东西:
public interface OutputFormat<K, V> { /** * Get the {@link RecordWriter} for the given job. * 获取输出记录键值记录 * * @param ignored * @param job configuration for the job whose output is being written. * @param name the unique name for this part of the output. * @param progress mechanism for reporting progress while writing to file. * @return a {@link RecordWriter} to write the output for the job. * @throws IOException */ RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException; /** * Check for validity of the output-specification for the job. * * <p>This is to validate the output specification for the job when it is * a job is submitted. Typically checks that it does not already exist, * throwing an exception when it already exists, so that output is not * overwritten.</p> * 作业执行之前进行的检測工作,比如配置的输出文件夹是否存在等 * * @param ignored * @param job job configuration. * @throws IOException when output should not be attempted */ void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException; }非常easy的2个方法,RecordWriter比較重要,后面的key-value的写入操作都是依据他来完毕的。可是他是一个接口,在MapReduce中,我们用的最多的他的子类是FileOutputFormat:
/** A base class for {@link OutputFormat}. */ public abstract class FileOutputFormat<K, V> implements OutputFormat<K, V> {他是一个抽象类。可是实现了接口中的第二个方法checkOutputSpecs()方法:
public void checkOutputSpecs(FileSystem ignored, JobConf job) throws FileAlreadyExistsException, InvalidJobConfException, IOException { // Ensure that the output directory is set and not already there Path outDir = getOutputPath(job); if (outDir == null && job.getNumReduceTasks() != 0) { throw new InvalidJobConfException("Output directory not set in JobConf."); } if (outDir != null) { FileSystem fs = outDir.getFileSystem(job); // normalize the output directory outDir = fs.makeQualified(outDir); setOutputPath(job, outDir); // get delegation token for the outDir's file system TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] {outDir}, job); // check its existence if (fs.exists(outDir)) { //假设输出文件夹以及存在,则抛异常 throw new FileAlreadyExistsException("Output directory " + outDir + " already exists"); } } }就是检查输出文件夹在不在的操作。在这个类里还出现了一个辅助类:
public static Path getTaskOutputPath(JobConf conf, String name) throws IOException { // ${mapred.out.dir} Path outputPath = getOutputPath(conf); if (outputPath == null) { throw new IOException("Undefined job output-path"); } //依据OutputCommitter获取输出路径 OutputCommitter committer = conf.getOutputCommitter(); Path workPath = outputPath; TaskAttemptContext context = new TaskAttemptContext(conf, TaskAttemptID.forName(conf.get("mapred.task.id"))); if (committer instanceof FileOutputCommitter) { workPath = ((FileOutputCommitter)committer).getWorkPath(context, outputPath); } // ${mapred.out.dir}/_temporary/_${taskid}/${name} return new Path(workPath, name); }就是上面OutputCommiter,里面定义了非常多和Task,job作业相关的方法。非常多时候都会与OutputFormat合作的形式出现。
他也有自己的子类实现FileOutputCommiter:
public class FileOutputCommitter extends OutputCommitter { public static final Log LOG = LogFactory.getLog( "org.apache.hadoop.mapred.FileOutputCommitter"); /** * Temporary directory name */ public static final String TEMP_DIR_NAME = "_temporary"; public static final String SUCCEEDED_FILE_NAME = "_SUCCESS"; static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = "mapreduce.fileoutputcommitter.marksuccessfuljobs"; public void setupJob(JobContext context) throws IOException { JobConf conf = context.getJobConf(); Path outputPath = FileOutputFormat.getOutputPath(conf); if (outputPath != null) { Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME); FileSystem fileSys = tmpDir.getFileSystem(conf); if (!fileSys.mkdirs(tmpDir)) { LOG.error("Mkdirs failed to create " + tmpDir.toString()); } } } ....在Reduce阶段的后面的写阶段,FileOutputFormat是默认的输出的类型:
//获取输出的key,value final RecordWriter<OUTKEY, OUTVALUE> out = new OldTrackingRecordWriter<OUTKEY, OUTVALUE>( reduceOutputCounter, job, reporter, finalName); OutputCollector<OUTKEY,OUTVALUE> collector = new OutputCollector<OUTKEY,OUTVALUE>() { public void collect(OUTKEY key, OUTVALUE value) throws IOException { //将处理后的key,value写入输出流中。最后写入HDFS作为终于结果 out.write(key, value); // indicate that progress update needs to be sent reporter.progress(); } };out就是直接发挥作用的类,可是是哪个Formtat的返回的呢。我们进入OldTrackingRecordWriter继续看:
public OldTrackingRecordWriter( org.apache.hadoop.mapred.Counters.Counter outputRecordCounter, JobConf job, TaskReporter reporter, String finalName) throws IOException { this.outputRecordCounter = outputRecordCounter; //默认是FileOutputFormat文件输出方式 this.fileOutputByteCounter = reporter .getCounter(FileOutputFormat.Counter.BYTES_WRITTEN); Statistics matchedStats = null; if (job.getOutputFormat() instanceof FileOutputFormat) { matchedStats = getFsStatistics(FileOutputFormat.getOutputPath(job), job); } fsStats = matchedStats; FileSystem fs = FileSystem.get(job); long bytesOutPrev = getOutputBytes(fsStats); //从配置中获取作业的输出方式 this.real = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter); long bytesOutCurr = getOutputBytes(fsStats); fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); }果然是我们所想的那样。FileOutputFormat,可是不要忘了它的getRecordWriter()是抽象方法,调用它还必须由它的子类来实现:
public abstract RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException;在这里我们举出当中在InputFormat举过的样例,TextOutputFormat,SequenceFileOutputFormat,与TextInputFormat,SequenceFileInputFormat相应。也就说说上面2个子类定义了2种截然不同的输出格式,也就返回了不一样的RecordWriter实现类.在TextOutputFormat中。他定义了一个叫LineRecordWriter的定义:
public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { //从配置推断输出是否要压缩 boolean isCompressed = getCompressOutput(job); //配置中获取加在key-value的分隔符 String keyValueSeparator = job.get("mapred.textoutputformat.separator", " "); //依据是否压缩获取对应的LineRecordWriter if (!isCompressed) { Path file = FileOutputFormat.getTaskOutputPath(job, name); FileSystem fs = file.getFileSystem(job); FSDataOutputStream fileOut = fs.create(file, progress); return new LineRecordWriter<K, V>(fileOut, keyValueSeparator); } else { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); // create the named codec CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job); // build the filename including the extension Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension()); FileSystem fs = file.getFileSystem(job); FSDataOutputStream fileOut = fs.create(file, progress); return new LineRecordWriter<K, V>(new DataOutputStream (codec.createOutputStream(fileOut)), keyValueSeparator); }他以一个内部类的形式存在于TextOutputFormat。而在SequenceFileOutputFormat中,他的形式是如何的呢:
public RecordWriter<K, V> getRecordWriter( FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { // get the path of the temporary output file Path file = FileOutputFormat.getTaskOutputPath(job, name); FileSystem fs = file.getFileSystem(job); CompressionCodec codec = null; CompressionType compressionType = CompressionType.NONE; if (getCompressOutput(job)) { // find the kind of compression to do compressionType = getOutputCompressionType(job); // find the right codec Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, DefaultCodec.class); codec = ReflectionUtils.newInstance(codecClass, job); } final SequenceFile.Writer out = SequenceFile.createWriter(fs, job, file, job.getOutputKeyClass(), job.getOutputValueClass(), compressionType, codec, progress); return new RecordWriter<K, V>() { public void write(K key, V value) throws IOException { out.append(key, value); } public void close(Reporter reporter) throws IOException { out.close();} }; }关键的操作都在于SequenceFile.Writer中。有不同的RecordWriter就会有不同的写入数据的方式。这里我们举LineRecordWriter的样例。看看他的写入方法:
//往输出流中写入key-value public synchronized void write(K key, V value) throws IOException { //推断键值对是否为空 boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; //假设k-v都为空,则操作失败,不写入直接返回 if (nullKey && nullValue) { return; } //假设key不空。则写入key if (!nullKey) { writeObject(key); } //假设key,value都不为空,则中间写入k-v分隔符。在这里为 空格符 if (!(nullKey || nullValue)) { out.write(keyValueSeparator); } //最后写入value if (!nullValue) { writeObject(value); }在这种方法里。我们就能看出他的存储形式就是我刚刚在上面讲的另外一种存储方式。这种方法将会在以下的代码中被运行:
OutputCollector<OUTKEY,OUTVALUE> collector = new OutputCollector<OUTKEY,OUTVALUE>() { public void collect(OUTKEY key, OUTVALUE value) throws IOException { //将处理后的key,value写入输出流中,最后写入HDFS作为终于结果 out.write(key, value); // indicate that progress update needs to be sent reporter.progress(); } };过程能够这么理解:
collector.collect()------->out.write(key, value)------->LineRecordWriter.write(key, value)------->DataOutputStream.write(key, value).
DataOutputStream是内置于LineRecordWriter的作为里面的变量存在的。这样从Reduce末尾阶段到Output的过程也全然打通了。以下能够看看这上面涉及的完整的类目关系。
下一阶段的学习,可能或偏向于Task。Job阶段的过程分析,更加宏观过程上的一个分析。也可能会分析某个功能块的实现过程,比方Hadoop的IPC过程。据说用了非常多JAVA NIO的东西。