zoukankan      html  css  js  c++  java
  • Hadoop OutputCommitter

    1. OutputCommitters

    MapReduce使用一个提交协议来确保作业(job)和任务(task)都完全成功或失败。这个通过 OutputCommiter来实现。

    新版本 MapReduce API中,OutputCommitter OutputFormat 通过getOutputCommitter() 方法确定。默认为FileOutputCommitter,适用于有文件输出的MapReduce任务。若是需要,也可以实现一个新的OutputCommitter类,以对作业的完成或任务做自定义设置或清理。

    OutputCommiter 部分源码如下:

    public abstract class OutputCommitter extends org.apache.hadoop.mapreduce.OutputCommitter {
       
    public OutputCommitter() {
        }

       
    public abstract void setupJob(JobContext var1) throws IOException;

       
    /** @deprecated */
       
    @Deprecated
       
    public void cleanupJob(JobContext jobContext) throws IOException {
        }

       
    public void commitJob(JobContext jobContext) throws IOException {
           
    this.cleanupJob(jobContext);
        }

       
    public void abortJob(JobContext jobContext, int status) throws IOException {
           
    this.cleanupJob(jobContext);
        }

       
    public abstract void setupTask(TaskAttemptContext var1) throws IOException;

       
    public abstract boolean needsTaskCommit(TaskAttemptContext var1) throws IOException;

       
    public abstract void commitTask(TaskAttemptContext var1) throws IOException;

       
    public abstract void abortTask(TaskAttemptContext var1) throws IOException;

    其中 setupJob在作业运行前被调用,用于初始化操作。当OutputCommitter 被设置为 FileOutputCommitter时,它会创建最终的输出目录${mapreduce.output.fileoutputformat.outputdir},并为任务的输出创建一个临时文件夹 _temporary,作为最终输出目录的子目录。

    FileOutputCommitter setupJob() 方法源码如下:

    public void setupJob(JobContext context) throws IOException {
       
    if (this.hasOutputPath()) {
            Path jobAttemptPath =
    this.getJobAttemptPath(context);
            FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration());
           
    if (!fs.mkdirs(jobAttemptPath)) {
                LOG.error(
    "Mkdirs failed to create " + jobAttemptPath);
            }
        }
    else {
            LOG.warn(
    "Output Path is null in setupJob()");
        }

    }

    其中 jobAttemptPath getJobAttemptPath(context) 获取,一层层往下查看此方法调用,最终可以看到FileOutputCommitter 创建的临时目录为:目标输出目录下的_temporary 子目录:

    private static Path getPendingJobAttemptsPath(Path out) {
       
    return new Path(out, "_temporary");
    }

    如果作业成功,则调用 commitJob() 方法。此方法会做临时文件的清理(cleanupJob()),并在最终输出目录中创建名为_SUCCESS的文件,表示Job成功执行完成。若是Job 执行失败,则被状态对象调用abortJob(),默认会调用 cleanupJob() 的方法,对临时文件进行清理。

    以上提到的是Job 级别的Committer。在 Task级别,同样也有上述几种方法:

    public abstract void setupTask(TaskAttemptContext var1) throws IOException;

    public abstract boolean needsTaskCommit(TaskAttemptContext var1) throws IOException;

    public abstract void commitTask(TaskAttemptContext var1) throws IOException;

    public abstract void abortTask(TaskAttemptContext var1) throws IOException;

    其中,在 task 执行之前会调用 setupTask(),但是默认并不做任何工作。因为创建临时任务的输出路径的工作已经在setupJob() 阶段完成。方法needsTaskCommit返回是否需要task 执行提交阶段。提交阶段的工作为:将临时目录下的输出(若有)移动到最终目录。若设置为 false,则执行框架不会为任务运行分布式提交协议,也就不会执行commitTask() abortTask()。当此task没有写任何输出时,FileOutputCommitter会跳过 commit (提交)阶段。

    如果task成功执行,并且有输出,则会调用commitTask() 方法,(默认的实现为)将临时目录下的输出文件移动到最终目录(mapreduce.output.fileoutputformat.outputdir)。若是执行失败,则调用abortTask(),删除任务输出的临时目录及文件。

    执行框架会保证一个task在有多次尝试的情况下,仅有一个task会被提交。

    2. mapreduce.fileoutputcommitter.algorithm.version 1 与 2 的区别

    FileOutputCommitter 有两个方法,commitTask 和 commitJob。Apache Spark 2.0 以及更高版本使用的是 Apache Hadoop 2。

    Apache Hadoop 2 使用 mapreduce.fileoutputcommitter.algorithm.version 控制 commitTask 和 commitJob 如何工作。

    在 Hadoop 2 中,默认的值是 1。在这种情况下,commitTask 会将 task 的输出文件从 task 的临时目录移动到 job 的临时目录下。

    在所有 task 任务完成后,commitJob 将生成的数据从 job 的临时目录移动到最终的 job 目录下。这个工作在 spark 中由 driver 完成。

    若是使用的是云存储(如 s3),则这个操作会消耗较长时间。会看到所有 task 已结束,但是任务仍未结束。

    在设置 mapreduce.fileoutputcommitter.algorithm.version 的值为 2 后,commitTask 会将 task 生成的输出文件从 task 临时目录直接移动到 job 的最终目录。

    此时,commitJob 基本无操作。

    References:

    [1] hadoop权威指南第4版 

    [2] https://docs.databricks.com/spark/latest/faq/append-slow-with-spark-2.0.0.html

  • 相关阅读:
    对比度受限的自适应直方图均衡化(CLAHE)
    双边滤波
    快速高斯滤波
    积分图像的应用(二):非局部均值去噪(NL-means)
    非局部均值去噪(NL-means)
    积分图像的应用(一):局部标准差 分类: 图像处理 Matlab 2015-06-06 13:31 137人阅读 评论(0) 收藏
    积分图像 分类: 图像处理 Matlab 2015-06-06 10:30 149人阅读 评论(0) 收藏
    双边滤波与引导滤波 分类: 图像处理 2015-04-29 14:52 48人阅读 评论(0) 收藏
    UE4射击小游戏原型
    UnrealEngine4 尝鲜
  • 原文地址:https://www.cnblogs.com/zackstang/p/10772236.html
Copyright © 2011-2022 走看看