zoukankan      html  css  js  c++  java
  • OutputFormat输出过程的学习

    转自:http://blog.csdn.net/androidlushangderen/article/details/41278351

    花了大约1周的时间,终于把MapReduce的5大阶段的源码学习结束掉了,收获不少,就算本人对Hadoop学习的一个里程碑式的纪念吧。今天花了一点点的时间,把MapReduce的最后一个阶段,输出OutputFormat给做了分析,这个过程跟InputFormat刚刚好是对着干的,二者极具对称性。为什么这么说呢,待我一一分析。

                OutputFormat过程的作用就是定义数据key-value的输出格式,给你处理好后的数据,究竟以什么样的形式输出呢,才能让下次别人拿到这个文件的时候能准确的提取出里面的数据。这里,我们撇开这个话题,仅仅我知道的一些定义的数据格式的方法,比如在Redis中会有这样的设计:

    [key-length][key][value-length][value][key-length][key][value-length][value]...

    或者说不一定非要省空间,直接搞过分隔符

    [key]   [value]

    [key]   [value]

    [key]   [value]

    .....

    这样逐行读取,再以空格隔开,取出里面的键值对,这么做简单是简单,就是不紧凑,空间浪费得有点多。在MapReduce的OutputFormat的有种格式用的就是这种方式。

            首先必须得了解OutputFormat里面到底有什么东西:

    1. public interface OutputFormat<K, V> {  
    2.   
    3.   /**  
    4.    * Get the {@link RecordWriter} for the given job. 
    5.    * 获取输出记录键值记录 
    6.    * 
    7.    * @param ignored 
    8.    * @param job configuration for the job whose output is being written. 
    9.    * @param name the unique name for this part of the output. 
    10.    * @param progress mechanism for reporting progress while writing to file. 
    11.    * @return a {@link RecordWriter} to write the output for the job. 
    12.    * @throws IOException 
    13.    */  
    14.   RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,  
    15.                                      String name, Progressable progress)  
    16.   throws IOException;  
    17.   
    18.   /**  
    19.    * Check for validity of the output-specification for the job. 
    20.    *   
    21.    * <p>This is to validate the output specification for the job when it is 
    22.    * a job is submitted.  Typically checks that it does not already exist, 
    23.    * throwing an exception when it already exists, so that output is not 
    24.    * overwritten.</p> 
    25.    * 作业运行之前进行的检测工作,例如配置的输出目录是否存在等 
    26.    * 
    27.    * @param ignored 
    28.    * @param job job configuration. 
    29.    * @throws IOException when output should not be attempted 
    30.    */  
    31.   void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException;  
    32. }  

    很简单的2个方法,RecordWriter比较重要,后面的key-value的写入操作都是根据他来完成的。但是他是一个接口,在MapReduce中,我们用的最多的他的子类是FileOutputFormat:

    1. /** A base class for {@link OutputFormat}. */  
    2. public abstract class FileOutputFormat<K, V> implements OutputFormat<K, V> {  

    他是一个抽象类,但是实现了接口中的第二个方法checkOutputSpecs()方法:

    1. public void checkOutputSpecs(FileSystem ignored, JobConf job)   
    2.     throws FileAlreadyExistsException,   
    3.            InvalidJobConfException, IOException {  
    4.     // Ensure that the output directory is set and not already there  
    5.     Path outDir = getOutputPath(job);  
    6.     if (outDir == null && job.getNumReduceTasks() != 0) {  
    7.       throw new InvalidJobConfException("Output directory not set in JobConf.");  
    8.     }  
    9.     if (outDir != null) {  
    10.       FileSystem fs = outDir.getFileSystem(job);  
    11.       // normalize the output directory  
    12.       outDir = fs.makeQualified(outDir);  
    13.       setOutputPath(job, outDir);  
    14.         
    15.       // get delegation token for the outDir's file system  
    16.       TokenCache.obtainTokensForNamenodes(job.getCredentials(),   
    17.                                           new Path[] {outDir}, job);  
    18.         
    19.       // check its existence  
    20.       if (fs.exists(outDir)) {  
    21.         //如果输出目录以及存在,则抛异常  
    22.         throw new FileAlreadyExistsException("Output directory " + outDir +   
    23.                                              " already exists");  
    24.       }  
    25.     }  
    26.   }  

    就是检查输出目录在不在的操作。在这个类里还出现了一个辅助类:

    1. public static Path getTaskOutputPath(JobConf conf, String name)   
    2.   throws IOException {  
    3.     // ${mapred.out.dir}  
    4.     Path outputPath = getOutputPath(conf);  
    5.     if (outputPath == null) {  
    6.       throw new IOException("Undefined job output-path");  
    7.     }  
    8.       
    9.     //根据OutputCommitter获取输出路径  
    10.     OutputCommitter committer = conf.getOutputCommitter();  
    11.     Path workPath = outputPath;  
    12.     TaskAttemptContext context = new TaskAttemptContext(conf,  
    13.                 TaskAttemptID.forName(conf.get("mapred.task.id")));  
    14.     if (committer instanceof FileOutputCommitter) {  
    15.       workPath = ((FileOutputCommitter)committer).getWorkPath(context,  
    16.                                                               outputPath);  
    17.     }  
    18.       
    19.     // ${mapred.out.dir}/_temporary/_${taskid}/${name}  
    20.     return new Path(workPath, name);  
    21.   }  

    就是上面OutputCommiter,里面定义了很多和Task,job作业相关的方法。很多时候都会与OutputFormat合作的形式出现。他也有自己的子类实现FileOutputCommiter:

    1. public class FileOutputCommitter extends OutputCommitter {  
    2.   
    3.   public static final Log LOG = LogFactory.getLog(  
    4.       "org.apache.hadoop.mapred.FileOutputCommitter");  
    5. /** 
    6.    * Temporary directory name  
    7.    */  
    8.   public static final String TEMP_DIR_NAME = "_temporary";  
    9.   public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";  
    10.   static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =  
    11.     "mapreduce.fileoutputcommitter.marksuccessfuljobs";  
    12.   
    13.   public void setupJob(JobContext context) throws IOException {  
    14.     JobConf conf = context.getJobConf();  
    15.     Path outputPath = FileOutputFormat.getOutputPath(conf);  
    16.     if (outputPath != null) {  
    17.       Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);  
    18.       FileSystem fileSys = tmpDir.getFileSystem(conf);  
    19.       if (!fileSys.mkdirs(tmpDir)) {  
    20.         LOG.error("Mkdirs failed to create " + tmpDir.toString());  
    21.       }  
    22.     }  
    23.   }  
    24.   ....  

    在Reduce阶段的后面的写阶段,FileOutputFormat是默认的输出的类型:

    1. //获取输出的key,value  
    2.     final RecordWriter<OUTKEY, OUTVALUE> out = new OldTrackingRecordWriter<OUTKEY, OUTVALUE>(  
    3.         reduceOutputCounter, job, reporter, finalName);  
    4.       
    5.     OutputCollector<OUTKEY,OUTVALUE> collector =   
    6.       new OutputCollector<OUTKEY,OUTVALUE>() {  
    7.         public void collect(OUTKEY key, OUTVALUE value)  
    8.           throws IOException {  
    9.           //将处理后的key,value写入输出流中,最后写入HDFS作为最终结果  
    10.           out.write(key, value);  
    11.           // indicate that progress update needs to be sent  
    12.           reporter.progress();  
    13.         }  
    14.       };  

    out就是直接发挥作用的类,但是是哪个Formtat的返回的呢,我们进入OldTrackingRecordWriter继续看:

    1. public OldTrackingRecordWriter(  
    2.         org.apache.hadoop.mapred.Counters.Counter outputRecordCounter,  
    3.         JobConf job, TaskReporter reporter, String finalName)  
    4.         throws IOException {  
    5.       this.outputRecordCounter = outputRecordCounter;  
    6.       //默认是FileOutputFormat文件输出方式  
    7.       this.fileOutputByteCounter = reporter  
    8.           .getCounter(FileOutputFormat.Counter.BYTES_WRITTEN);  
    9.       Statistics matchedStats = null;  
    10.       if (job.getOutputFormat() instanceof FileOutputFormat) {  
    11.         matchedStats = getFsStatistics(FileOutputFormat.getOutputPath(job), job);  
    12.       }  
    13.       fsStats = matchedStats;  
    14.   
    15.       FileSystem fs = FileSystem.get(job);  
    16.       long bytesOutPrev = getOutputBytes(fsStats);  
    17.       //从配置中获取作业的输出方式  
    18.       this.real = job.getOutputFormat().getRecordWriter(fs, job, finalName,  
    19.           reporter);  
    20.       long bytesOutCurr = getOutputBytes(fsStats);  
    21.       fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);  
    22.     }  

    果然是我们所想的那样,FileOutputFormat,但是不要忘了它的getRecordWriter()是抽象方法,调用它还必须由它的子类来实现:

    1. public abstract RecordWriter<K, V> getRecordWriter(FileSystem ignored,  
    2.                                               JobConf job, String name,  
    3.                                               Progressable progress)  
    4.    throws IOException;  

    在这里我们举出其中在InputFormat举过的例子,TextOutputFormat,SequenceFileOutputFormat,与TextInputFormat,SequenceFileInputFormat对应。也就说说上面2个子类定义了2种截然不同的输出格式,也就返回了不一样的RecordWriter实现类.在TextOutputFormat中,他定义了一个叫LineRecordWriter的定义:

    1. public RecordWriter<K, V> getRecordWriter(FileSystem ignored,  
    2.                                                  JobConf job,  
    3.                                                  String name,  
    4.                                                  Progressable progress)  
    5.    throws IOException {  
    6. //从配置判断输出是否要压缩  
    7.    boolean isCompressed = getCompressOutput(job);  
    8.    //配置中获取加在key-value的分隔符  
    9.    String keyValueSeparator = job.get("mapred.textoutputformat.separator",   
    10.                                       " ");  
    11.    //根据是否压缩获取相应的LineRecordWriter  
    12.    if (!isCompressed) {  
    13.      Path file = FileOutputFormat.getTaskOutputPath(job, name);  
    14.      FileSystem fs = file.getFileSystem(job);  
    15.      FSDataOutputStream fileOut = fs.create(file, progress);  
    16.      return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);  
    17.    } else {  
    18.      Class<? extends CompressionCodec> codecClass =  
    19.        getOutputCompressorClass(job, GzipCodec.class);  
    20.      // create the named codec  
    21.      CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);  
    22.      // build the filename including the extension  
    23.      Path file =   
    24.        FileOutputFormat.getTaskOutputPath(job,   
    25.                                           name + codec.getDefaultExtension());  
    26.      FileSystem fs = file.getFileSystem(job);  
    27.      FSDataOutputStream fileOut = fs.create(file, progress);  
    28.      return new LineRecordWriter<K, V>(new DataOutputStream  
    29.                                        (codec.createOutputStream(fileOut)),  
    30.                                        keyValueSeparator);  
    31.    }  

    他以一个内部类的形式存在于TextOutputFormat。而在SequenceFileOutputFormat中,他的形式是怎样的呢:

    1. public RecordWriter<K, V> getRecordWriter(  
    2.                                         FileSystem ignored, JobConf job,  
    3.                                         String name, Progressable progress)  
    4.   throws IOException {  
    5.   // get the path of the temporary output file   
    6.   Path file = FileOutputFormat.getTaskOutputPath(job, name);  
    7.     
    8.   FileSystem fs = file.getFileSystem(job);  
    9.   CompressionCodec codec = null;  
    10.   CompressionType compressionType = CompressionType.NONE;  
    11.   if (getCompressOutput(job)) {  
    12.     // find the kind of compression to do  
    13.     compressionType = getOutputCompressionType(job);  
    14.   
    15.     // find the right codec  
    16.     Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,  
    17.  DefaultCodec.class);  
    18.     codec = ReflectionUtils.newInstance(codecClass, job);  
    19.   }  
    20.   final SequenceFile.Writer out =   
    21.     SequenceFile.createWriter(fs, job, file,  
    22.                               job.getOutputKeyClass(),  
    23.                               job.getOutputValueClass(),  
    24.                               compressionType,  
    25.                               codec,  
    26.                               progress);  
    27.   
    28.   return new RecordWriter<K, V>() {  
    29.   
    30.       public void write(K key, V value)  
    31.         throws IOException {  
    32.   
    33.         out.append(key, value);  
    34.       }  
    35.   
    36.       public void close(Reporter reporter) throws IOException { out.close();}  
    37.     };  
    38. }  

    关键的操作都在于SequenceFile.Writer中。有不同的RecordWriter就会有不同的写入数据的方式,这里我们举LineRecordWriter的例子。看看他的写入方法:

    1. //往输出流中写入key-value  
    2.     public synchronized void write(K key, V value)  
    3.       throws IOException {  
    4.   
    5.       //判断键值对是否为空  
    6.       boolean nullKey = key == null || key instanceof NullWritable;  
    7.       boolean nullValue = value == null || value instanceof NullWritable;  
    8.         
    9.       //如果k-v都为空,则操作失败,不写入直接返回  
    10.       if (nullKey && nullValue) {  
    11.         return;  
    12.       }  
    13.         
    14.       //如果key不空,则写入key  
    15.       if (!nullKey) {  
    16.         writeObject(key);  
    17.       }  
    18.         
    19.       //如果key,value都不为空,则中间写入k-v分隔符,在这里为 空格符  
    20.       if (!(nullKey || nullValue)) {  
    21.         out.write(keyValueSeparator);  
    22.       }  
    23.         
    24.       //最后写入value  
    25.       if (!nullValue) {  
    26.         writeObject(value);  
    27.       }  

    在这个方法里,我们就能看出他的存储形式就是我刚刚在上面讲的第二种存储方式。这个方法将会在下面的代码中被执行:

    1. OutputCollector<OUTKEY,OUTVALUE> collector =   
    2.       new OutputCollector<OUTKEY,OUTVALUE>() {  
    3.         public void collect(OUTKEY key, OUTVALUE value)  
    4.           throws IOException {  
    5.           //将处理后的key,value写入输出流中,最后写入HDFS作为最终结果  
    6.           out.write(key, value);  
    7.           // indicate that progress update needs to be sent  
    8.           reporter.progress();  
    9.         }  
    10.       };  

    过程可以这么理解:

    collector.collect()------->out.write(key, value)------->LineRecordWriter.write(key, value)------->DataOutputStream.write(key, value).

    DataOutputStream是内置于LineRecordWriter的作为里面的变量存在的。这样从Reduce末尾阶段到Output的过程也完全打通了。下面可以看看这上面涉及的完整的类目关系。

          下一阶段的学习,可能或偏向于Task,Job阶段的过程分析,更加宏观过程上的一个分析。也可能会分析某个功能块的实现过程,比如Hadoop的IPC过程,据说用了很多JAVA NIO的东西。

  • 相关阅读:
    组合与封装
    继承与派生
    面向对象编程
    subprocess、re、logging模块
    json、pickle、collections、openpyxl模块
    python内置模块
    递归函数与模块
    生成式、面向过程、与函数式
    叠加装饰器与迭代器
    闭包函数与装饰器
  • 原文地址:https://www.cnblogs.com/cxzdy/p/5043998.html
Copyright © 2011-2022 走看看