zoukankan      html  css  js  c++  java
  • Hadoop OutputFormat浅析

    问题:reduce输出时,如果不是推测任务写结果时会先写临时目录最后移动到输出目录吗?

    下面部分转自Hadoop官网说明

    OutputFormat 描述Map/Reduce作业的输出样式。

    Map/Reduce框架根据作业的OutputFormat来:

    1. 检验作业的输出,例如检查输出路径是否已经存在。
    2. 提供一个RecordWriter的实现,用来输出作业结果。 输出文件保存在FileSystem上。

    TextOutputFormat是默认的 OutputFormat。

    任务的Side-Effect File

    在一些应用程序中,子任务需要产生一些side-file,这些文件与作业实际输出结果的文件不同。

    在这种情况下,同一个Mapper或者Reducer的两个实例(比如预防性任务)同时打开或者写 FileSystem上的同一文件就会产生冲突。因此应用程序在写文件的时候需要为每次任务尝试(不仅仅是每次任务,每个任务可以尝试执行很多次)选取一个独一无二的文件名(使用attemptid,例如task_200709221812_0001_m_000000_0)。

    为了避免冲突,Map/Reduce框架为每次尝试执行任务都建立和维护一个特殊的 ${mapred.output.dir}/_temporary/_${taskid}子目录,这个目录位于本次尝试执行任务输出结果所在的FileSystem上,可以通过${mapred.work.output.dir}来访问这个子目录。 对于成功完成的任务尝试,只有${mapred.output.dir}/_temporary/_${taskid}下的文件会移动${mapred.output.dir}。当然,框架会丢弃那些失败的任务尝试的子目录。这种处理过程对于应用程序来说是完全透明的。

    在任务执行期间,应用程序在写文件时可以利用这个特性,比如 通过 FileOutputFormat.getWorkOutputPath()获得${mapred.work.output.dir}目录, 并在其下创建任意任务执行时所需的side-file,框架在任务尝试成功时会马上移动这些文件,因此不需要在程序内为每次任务尝试选取一个独一无二的名字。

    注意:在每次任务尝试执行期间,${mapred.work.output.dir} 的值实际上是 ${mapred.output.dir}/_temporary/_{$taskid},这个值是Map/Reduce框架创建的。 所以使用这个特性的方法是,在FileOutputFormat.getWorkOutputPath() 路径下创建side-file即可。

    对于只使用map不使用reduce的作业,这个结论也成立。这种情况下,map的输出结果直接生成到HDFS上。

    RecordWriter

    RecordWriter 生成<key, value> 对到输出文件。

    RecordWriter的实现把作业的输出结果写到 FileSystem。

    下面部分转自http://www.cnblogs.com/noures/archive/2012/07/13/2589767.html

    在 Hadoop中,OutputFormat和InputFormat是相对应的两个东西。相比于InputFormat,OutputFormat似乎没 有那么多细节。InputFormat涉及到对输入数据的解析和划分,继而影响到Map任务的数目,以及Map任务的调度(见Hadoop InputFormat浅析》)。而OutputFormat似乎像其字面意思那样,仅仅是完成对输出数据的格式化。

    对于输出数据的格式化,这个应该没什么值得多说的。根据需要,OutputFormat爱把输出写成什么格式就写成什么格式、爱把输出写到数据库就写到数据库、爱把输出通过网络发给其他服务就发给其他服务...

     
    不过,OutputFormat所做的事情其实并不限于此。OutputFormat类包含如下三个方法:
    RecordWriter  getRecordWriter(TaskAttemptContext context);
    void  checkOutputSpecs(JobContext context);
    OutputCommitter  getOutputCommitter(TaskAttemptContext context);
    其中:
    checkOutputSpecs是 在JobClient提交Job之前被调用的(在使用InputFomat进行输入数据划分之前),用于检测Job的输出路径。比 如,FileOutputFormat通过这个方法来确认在Job开始之前,Job的Output路径并不存在,然后该方法又会重新创建这个Output 路径。这样一来,就能确保Job结束后,Output路径下的东西就是且仅是该Job输出的。
     

    getRecordWriter用于返回一个RecordWriter的实例,Reduce任务在执行的时候就是利用这个实例来输出Key/Value的。(如果Job不需要Reduce,那么Map任务会直接使用这个实例来进行输出。)

    RecordWriter有如下两个方法:

    void  write(K key, V value);
    void  close(TaskAttemptContext context);
    前者负责将Reduce输出的Key/Value写成特定的格式,后者负责对输出做最后的确认并关闭输出。
    前面提到的OutputFormat的字面含义,其实就是由这个RecordWriter来实现的。
     
    而第三个方法,getOutputCommitter则 用于返回一个OutputCommitter的实例。(在Hadoop-0.20中,MapReduce有两套API。 getOutputCommitter是在NewAPI中才提供的,OldAPI里面并没有。不过OldAPI同样有OutputCommtter这个东 西,只是不能通过OutputFormat来定制而已。)
     
    OutputCommitter用于控制Job的输出环境,它有下面几个方法:
     
    void  setupJob(JobContext jobContext);
    void  commitJob(JobContext jobContext);
    void  abortJob(JobContext jobContext, JobStatus.State state);
    void  setupTask(TaskAttemptContext taskContext);
    boolean  needsTaskCommit(TaskAttemptContext taskContext);
    void  commitTask(TaskAttemptContext taskContext);
    void  abortTask(TaskAttemptContext taskContext);  

    Job开始被执行之前,框架会调用OutputCommitter.setupJob()为Job创建一个输出路径;

    如果Job成功完成,框架会调用OutputCommitter.commitJob()提交Job的输出;

    如果Job失败,框架会调用OutputCommitter.abortJob()撤销Job的输出;

    对 应于Job下的每一个Task,同样牵涉创建、提交和撤销三个动作,分别由OutputCommitter.setupTask()、 OutputCommitter.commitTask()、OutputCommitter.abortTask()来完成。而一个Task可能没有输 出,从而也就不需要提交,这个可以通过OutputCommitter.needsTaskCommit()来判断;

     

    具体OutputCommitter的这些方法里面完成了什么样的操作,这是由具体的OutputCommitter来定制的,可以任意去实现。比如,FileOutputCommitter完成了如下操作:

    setupJob - mkdir ${mapred.output.dir}/_temporary
    commitJob - touch ${mapred.output.dir}/_SUCCESS && rm -r ${mapred.output.dir}/_temporary
    abortJob - rm -r ${mapred.output.dir}/_temporary
    setupTask - <nothing>
    needsTaskCommit - test -d ${mapred.output.dir}/_temporary/_${TaskAttemptID}
    commitTask - mv ${mapred.output.dir}/_temporary/_${TaskAttemptID}/* ${mapred.output.dir}/
    abortTask - rm -r ${mapred.output.dir}/_temporary/_${TaskAttemptID}

    (注意,上面这些路径都是HDFS上的,不是某个TaskTracker本地机器上的。)

    其 中的逻辑是:Job执行的时候,Task的输出放到Output路径下的_temporary目录的以TaskAttemptID命名的子目录中。只有当 Task成功了,相应的输出才会被提交到Output路径下。而只有当整个Job都成功了,才会在Output路径下放置_SUCCESS文件。 _SUCCESS文件的存在表明了Output路径下的输出信息是正确且完整的;而如果_SUCCESS文件不存在,Output下的信息也依然是正确的 (这已经由commitTask保证了),但是不一定是完整的(可能只包含部分Reduce的输出)。

    与之对应,FileOutputFormat会让它所创建的RecordWriter将输出写到${mapred.output.dir}/_temporary/_${TaskAttemptID}/下。当然,Map和Reduce任务也可以自己向这个路径put数据。
     

    接下来就是到在哪里去执行这些方法的问题了。

    一 个Job被提交到JobTracker后会生成若干的Map和Reduce任务,这些任务会被分派到TaskTracker上。对于每一个 Task,TaskTracker会使用一个子JVM来执行它们。那么对于Task的setup/commit/abort这些操作,自然应该在执行 Task的子JVM里面去完成:

    当一个Task被关联到一个子JVM后,在任务初始化阶段,OutputCommitter.setupTask()会被调用;

    当 一个任务执行成功完成了之后,脱离子JVM之前,OutputCommitter.commitTask()会被调用。不过这里还有两个细节:1、需要先 调用OutputCommitter.needsTaskCommit()来确定是否有输出需要提交;2、提交之前还有一个同步逻辑,需要由 JobTracker同意提交后才能提交。因为Hadoop有推测执行的逻辑,一个Task可能在多个TaskTracker上同时执行,但是它们之中最 多只有一个能得到提交,否则可能导致结果的错乱;

    当 一个任务执行失败时,OutputCommitter.abortTask()会被调用。这个调用很特殊,它不大可能在执行任务的子JVM里面完成。因为 执行任务的子JVM里面跑的是用户提供的Map/Reduce代码,Hadoop框架是无法保证这些代码的稳定性的,所以任务的失败往往伴随着子JVM的 异常退出(这也就是为什么要用子JVM来执行Map和Reduce任务的原因,否则异常退出的可能就是整个框架了)。于是,对于失败的任 务,JobTracker除了要考虑它的重试之外,还要为其生成一个cleanup任务。这个cleanup任务像普通的Map和Reduce任务一样, 会被分派到TaskTracker上去执行(不一定分派到之前执行该任务失败的那个TaskTracker上,因为输出是在HDFS上,是全局的)。而它 的执行逻辑主要就是调用OutputCommitter.abortTask();

     

    而对于Job的setup/commit/abort,则显然不能使用上面的逻辑。

    从 时间上说,OutputCommitter.setupJob()应该在所有Map和Reduce任务执行之前被调用、 OutputCommitter.commitJob()应该在所有Map和Reduce任务执行之后被调用、而 OutputCommitter.abortJob()应该在Job确认失败之后被调用;

    从地点上说,可能调用这些方法的地方无外乎JobClient、JobTracker、或TaskTracker;

    JobClient 应该第一个被排除,因为Job的执行并不依赖于JobClient。JobClient在提交完Job之后就可以退出了,它的退出并不会影响Job的继续 执行(如果不退出则可以接收JobTracker的进度反馈)。所以,不可能依靠JobClient在Job成功以后来调用 OutputCommitter.commitJob();

    JobTracker 呢?貌似是个合适的地方,因为JobTracker明确知道Job的开始与结束、成功与失败。但是实际上还是不能由JobTracker来调用这些方法。 就像前面说到的OutputCommitter.abortTask()一样,既然JobTracker知道了Task的失败,却不直接为它清理输出,而 是通过生成一个对应的cleanup任务来完成清理工作。为什么要这样做呢?其实原因很简单,因为OutputCommitter是独立于Hadoop框 架,可以由用户自己定制的。Hadoop框架不能保证用户定制代码的稳定性,当然不能让它直接在JobTracker上执行。必须启动一个新的JVM来执 行这些方法,那么正好TaskTracker上已经有这样的逻辑了。

    所 以,对于Job的setup/commit/abort,跟OutputCommitter.abortTask()类似,JobTracker会生成对 应的setup任务和cleanup任务。在初始化Job的时期将Job的setup任务分派给TaskTracker,TaskTracker执行这个 setup任务所要做的事情就是调用OutputCommitter.setupJob();在Job结束时,Job的cleanup任务将分派给 TaskTracker,TaskTracker执行这个cleanup任务所要做的事情就是根据Job的执行结果是成功或是失败,来调用 OutputCommitter.commitJob()或OutputCommitter.abortJob()。

    为 了保证OutputCommitter.setupJob()在所有Map和Reduce任务执行之前被调用,在JobTracker上,Job的初始化 被分成了两个步骤:一是为Job生成一堆任务,二是将setup任务分派给TaskTracker去执行,并等待它执行完成。在这之后,初始化才算完 成,Map和Reduce任务才能得到分派。

    可见,在Job执行的过程中,除了我们关注的Map和Reduce任务之外,还会有一些隐藏的setup和cleanup任务。不过这些任务都有一个共同点,它们都可以是用户定制的。

  • 相关阅读:
    第36课 经典问题解析三
    第35课 函数对象分析
    67. Add Binary
    66. Plus One
    58. Length of Last Word
    53. Maximum Subarray
    38. Count and Say
    35. Search Insert Position
    28. Implement strStr()
    27. Remove Element
  • 原文地址:https://www.cnblogs.com/YDDMAX/p/6828363.html
Copyright © 2011-2022 走看看