zoukankan      html  css  js  c++  java
  • MapReduce执行过程源码分析(一)——Job任务的提交

    为了能使源码的执行过程与Hadoop权威指南(2、3版)中章节Shuffle and Sort的分析相对应,Hadoop的版本为0.20.2。

    一般情况下我们通过Job(org.apache.hadoop.mapreduce.Job)的方法waitForCompletion来开始一个Job的执行。

        /**
         * Submit the job to the cluster and wait for it to finish.
         * 
         * @param verbose
         *            print the progress to the user
         * @return true if the job succeeded
         * @throws IOException
         *             thrown if the communication with the <code>JobTracker</code>
         *             is lost
         */
        public boolean waitForCompletion(boolean verbose) throws IOException,
                InterruptedException, ClassNotFoundException {
            if (state == JobState.DEFINE) {
                submit();
            }
    
            if (verbose) {
                jobClient.monitorAndPrintJob(conf, info);
            } else {
                info.waitForCompletion();
            }
    
            return isSuccessful();
        }

    通常设置方法参数verbose为true,这样可以在控制台中看到Job的执行过程信息。

    其中Job的具体提交过程是由方法submit完成的,

        /**
         * Submit the job to the cluster and return immediately.
         * 
         * @throws IOException
         */
        public void submit() throws IOException, InterruptedException,
                ClassNotFoundException {
            ensureState(JobState.DEFINE);
    
            setUseNewAPI();
    
            info = jobClient.submitJobInternal(conf);
    
            state = JobState.RUNNING;
        }

    而submit方法的执行又依赖于JobClient submitJobInternal来完成,方法submitJobInternal是Job任务提交过程中的重点,在方法中完成的Job任务的初始化准备工作。

        /**
         * Internal method for submitting jobs to the system.
         * 
         * @param job
         *            the configuration to submit
         * @return a proxy object for the running job
         * @throws FileNotFoundException
         * @throws ClassNotFoundException
         * @throws InterruptedException
         * @throws IOException
         */
        public RunningJob submitJobInternal(JobConf job)
                throws FileNotFoundException, ClassNotFoundException,
                InterruptedException, IOException {
            /*
             * configure the command line options correctly on the submitting dfs
             */
            JobID jobId = jobSubmitClient.getNewJobId();
    
            /*
             * 在submitJobDir目录下有三个文件:job.jar、job.split、job.xml
             * 
             * **********************************************************************
             */
            Path submitJobDir = new Path(getSystemDir(), jobId.toString());
            Path submitJarFile = new Path(submitJobDir, "job.jar");
            Path submitSplitFile = new Path(submitJobDir, "job.split");
    
            configureCommandLineOptions(job, submitJobDir, submitJarFile);
    
            Path submitJobFile = new Path(submitJobDir, "job.xml");
    
            /*
             * 获取reducer的数目
             * 
             * **********************************************************************
             */
            int reduces = job.getNumReduceTasks();
    
            JobContext context = new JobContext(job, jobId);
    
            /*
             * Check the output specification
             * 
             * 根据是否使用New API验证OutputFormat
             * 
             * 如输出格式设置(未设置默认为TextOutputFormat)、是否设置输出路径及输出路径是否已经存在
             * 
             * **********************************************************************
             */
            if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {
                org.apache.hadoop.mapreduce.OutputFormat<?, ?> output = ReflectionUtils
                        .newInstance(context.getOutputFormatClass(), job);
    
                output.checkOutputSpecs(context);
            } else {
                job.getOutputFormat().checkOutputSpecs(fs, job);
            }
    
            /*
             * Create the splits for the job
             * 
             * *******************************************************************
             */
            LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
    
            /*
             * 根据输入切片的数目决定map任务的数目
             * 
             * 一个输入切片对应一个map
             * 
             * *******************************************************************
             */
            int maps;
    
            if (job.getUseNewMapper()) {
                maps = writeNewSplits(context, submitSplitFile);
            } else {
                maps = writeOldSplits(job, submitSplitFile);
            }
    
            job.set("mapred.job.split.file", submitSplitFile.toString());
    
            job.setNumMapTasks(maps);
    
            /*
             * Write job file to JobTracker's fs
             * 
             * **********************************************************************
             */
            FSDataOutputStream out = FileSystem.create(fs, submitJobFile,
                    new FsPermission(JOB_FILE_PERMISSION));
    
            try {
                job.writeXml(out);
            } finally {
                out.close();
            }
    
            /*
             * Now, actually submit the job (using the submit name)
             * 
             * **********************************************************************
             */
            JobStatus status = jobSubmitClient.submitJob(jobId);
            
            if (status != null) {
                return new NetworkedJob(status);
            } else {
                throw new IOException("Could not launch job");
            }
        }

    下面对该方法内部的执行流程进行详细分析:

    (1)生成Job ID

    JobID jobId = jobSubmitClient.getNewJobId();

    (2)目录相关及文件

    在submitJobDir目录下有三个文件:job.jar、job.split、job.xml,其中

    job.jar:Job相关类(资源)的一个Jar包;

    job.split:Job的输入文件(可能有多个或可以是其它格式(如HBase HTable))会根据一定的条件进行切片,每一个切片中的“数据”会对应的Job的一个Map任务,即每一个Map仅处理某一个切片中的“数据”;

    job.xml:用以保存Job相关的配置数据。

            /*
             * 在submitJobDir目录下有三个文件:job.jar、job.split、job.xml
             * 
             * **********************************************************************
             */
            Path submitJobDir = new Path(getSystemDir(), jobId.toString());
            Path submitJarFile = new Path(submitJobDir, "job.jar");
            Path submitSplitFile = new Path(submitJobDir, "job.split");
    
            /*
             * 根据命令行参数-libjars, -files, -archives对Job进行相应的配置
             */
            configureCommandLineOptions(job, submitJobDir, submitJarFile);
    
            Path submitJobFile = new Path(submitJobDir, "job.xml");

    其中,configureCommandLineOptions主要是根据用户在命令行环境下提供的参数(-libjars、-files、-archives)进行DistributedCache的设置,并将相应的Jar拷贝至目录submitJobDir中。

    注:DistributedCache的相关知识会在后续分析,在此先不进行讨论。

    (3)Reducer数目

    获取用户所设置的该Job中Reducer的数目。

    int reduces = job.getNumReduceTasks();

    (4)JobContext

    JobContext context = new JobContext(job, jobId);

    其实JobContext就是对JobConf与JobID的封装。

    (5)Job输出相关校验

            /*
             * Check the output specification
             * 
             * 根据是否使用New API验证OutputFormat
             * 
             * 如输出格式设置(未设置默认为TextOutputFormat)、是否设置输出路径及输出路径是否已经存在
              * 
             * **********************************************************************
             */
            if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {
                org.apache.hadoop.mapreduce.OutputFormat<?, ?> output = ReflectionUtils
                        .newInstance(context.getOutputFormatClass(), job);
    
                output.checkOutputSpecs(context);
            } else {
                job.getOutputFormat().checkOutputSpecs(fs, job);
            }

    校验时会根据是否使用新版本的API分为两种情况,默认情况下使用的都是新版本的API,所以此处不考虑旧版本API的情况,所以分析的代码变为

     

                org.apache.hadoop.mapreduce.OutputFormat<?, ?> output = ReflectionUtils
                        .newInstance(context.getOutputFormatClass(), job);
    
                output.checkOutputSpecs(context);

    首先,获取输出的具体格式;

    context.getOutputFormatClass()

     

        /**
         * Get the {@link OutputFormat} class for the job.
         * 
         * @return the {@link OutputFormat} class for the job.
         */
        @SuppressWarnings("unchecked")
        public Class<? extends OutputFormat<?, ?>> getOutputFormatClass()
                throws ClassNotFoundException {
            return (Class<? extends OutputFormat<?, ?>>) conf.getClass(
                    OUTPUT_FORMAT_CLASS_ATTR, TextOutputFormat.class);
        }

    由上述代码可以看出,如果用户并没有明确指定输出格式类型,则默认使用TextOutputFormat。

    注:文本是进行数据分析时经常使用的一种格式,因此本文主要使用TextInputFormat、TextOutputFormat进行讲解。

    然后,通过反射将输出格式实例化;

    org.apache.hadoop.mapreduce.OutputFormat<?, ?> output = ReflectionUtils
                        .newInstance(context.getOutputFormatClass(), job);

    最后,通过输出格式的具体类型进行校验,包括两个部分:是否设置输出目录及输出目录是否已经存在。

    TextOutputFormat的checkOutputSpecs继承自它的父类FileOutputFormat。

         public void checkOutputSpecs(JobContext job)
                throws FileAlreadyExistsException, IOException {
            // Ensure that the output directory is set and not already there
            Path outDir = getOutputPath(job);
    
            if (outDir == null) {
                throw new InvalidJobConfException("Output directory not set.");
            }
    
            if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
                throw new FileAlreadyExistsException("Output directory " + outDir
                        + " already exists");
            }
        }

    (6)生成输入切片(Split),并设置Map的数目;

            /*
             * Create the splits for the job
             * 
             * *******************************************************************
             */
            LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
    
            /*
             * 根据输入切片的数目决定map任务的数目
              * 
             * 一个输入切片对应一个map
             * 
             * *******************************************************************
             */
            int maps;
    
            if (job.getUseNewMapper()) {
                maps = writeNewSplits(context, submitSplitFile);
            } else {
                maps = writeOldSplits(job, submitSplitFile);
            }
    
            job.set("mapred.job.split.file", submitSplitFile.toString());
    
            job.setNumMapTasks(maps);

    这里仅分析新版本API下的writeNewSplits,该方法需要两个参数:JobContext及切片文件的Path。

        @SuppressWarnings("unchecked")
        private <T extends org.apache.hadoop.mapreduce.InputSplit> int writeNewSplits(
                JobContext job, Path submitSplitFile) throws IOException,
                InterruptedException, ClassNotFoundException {
            JobConf conf = job.getJobConf();
    
            /*
             * 创建InputFormat实例
             * 
             * 不同的InputFormat实例获取Split的方式不同
             * 
             * ******************************************************************
             */
            org.apache.hadoop.mapreduce.InputFormat<?, ?> input = ReflectionUtils
                    .newInstance(job.getInputFormatClass(), job.getJobConf());
    
            /*
             * 获取输入文件对应的切片记录
             * 
             * ******************************************************************
             */
            List<org.apache.hadoop.mapreduce.InputSplit> splits = input
                    .getSplits(job);
    
            T[] array = (T[]) splits
                    .toArray(new org.apache.hadoop.mapreduce.InputSplit[splits
                            .size()]);
    
            /*
             * sort the splits into order based on size, so that the biggest go
             * first
             * 
             * ******************************************************************
             */
            Arrays.sort(array, new NewSplitComparator());
    
            /*
             * 写出SplitFile
             * 
             * ******************************************************************
             */
    
            // 打开切片文件输出流,并写出头信息(头、版本号、切片数目)
            DataOutputStream out = writeSplitsFileHeader(conf, submitSplitFile,
                    array.length);
    
            try {
                if (array.length != 0) {
                    DataOutputBuffer buffer = new DataOutputBuffer();
    
                    RawSplit rawSplit = new RawSplit();
    
                    SerializationFactory factory = new SerializationFactory(conf);
    
                    Serializer<T> serializer = factory
                            .getSerializer((Class<T>) array[0].getClass());
    
                    serializer.open(buffer);
    
                    for (T split : array) {
                        rawSplit.setClassName(split.getClass().getName());
    
                        buffer.reset();
    
                        // 序列化文件名、起始位置、切片长度、主机位置(多个)
                        serializer.serialize(split);
    
                        rawSplit.setDataLength(split.getLength());
    
                        rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
    
                        rawSplit.setLocations(split.getLocations());
    
                        rawSplit.write(out);
                    }
    
                    serializer.close();
                }
            } finally {
                out.close();
            }
    
            return array.length;
        }

    方法思路:根据指定的输入格式类型(InputFormat)对输入文件进行切片,并将切片信息保存至指定的切片文件中。

    注:切片并不是对输入文件进行物理上的切割,而只是一种逻辑上的“分割”,即将输入文件某个片段的起始位置保存下来,后期Map任务运行时根据切片文件就可以将该片段作为输入进行处理。

    首先,获取输入格式类型,

    job.getInputFormatClass()

     

    /**
         * Get the {@link InputFormat} class for the job.
         * 
         * @return the {@link InputFormat} class for the job.
         */
        @SuppressWarnings("unchecked")
        public Class<? extends InputFormat<?, ?>> getInputFormatClass()
                throws ClassNotFoundException {
            return (Class<? extends InputFormat<?, ?>>) conf.getClass(
                    INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
        }

    与输出格式类型相同,如果用户没有特殊指定,默认的输入格式类型为TextInputFormat,然后将此输入格式类型实例化。

    org.apache.hadoop.mapreduce.InputFormat<?, ?> input = ReflectionUtils
                    .newInstance(job.getInputFormatClass(), job.getJobConf());

    然后,根据具体的输入格式类型计算切片信息,

            /*
             * 获取输入文件对应的切片记录
              * 
             * ******************************************************************
             */
            List<org.apache.hadoop.mapreduce.InputSplit> splits = input
                    .getSplits(job);

    TextInputFormat的方法getSplits继承自它的父类FileInputFormat。

        /**
         * Generate the list of files and make them into FileSplits.
         */
        public List<InputSplit> getSplits(JobContext job) throws IOException {
            /*
             * 计算Split的最小值与最大值
             * 
             * ********************************************************************
             */
            long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
            long maxSize = getMaxSplitSize(job);
    
            // generate splits
            List<InputSplit> splits = new ArrayList<InputSplit>();
    
            /*
             * 逐个处理InputPaths中的文件
             * 
             * *******************************************************************
             */
            for (FileStatus file : listStatus(job)) {
                Path path = file.getPath();
    
                FileSystem fs = path.getFileSystem(job.getConfiguration());
    
                /*
                 * 获取特定文件的长度
                 * 
                 * ******************************************************************
                 */
                long length = file.getLen();
    
                /*
                 * 获取特定文件对应的块Block信息
                 * 
                 * ***************************************************************
                 */
                BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
                        length);
    
                /*
                 * 如果文件长度大于0且是可切片的
                 * 
                 * ***************************************************************
                 */
                if ((length != 0) && isSplitable(job, path)) {
                    long blockSize = file.getBlockSize();
    
                    /*
                     * 根据blockSize、minSize、maxSize计算切片大小
                     * 
                     * Math.max(minSize, Math.min(maxSize, blockSize)
                     * 
                     * ***********************************************************
                     */
                    long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    
                    long bytesRemaining = length;
    
                    while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
                        /*
                         * 返回的Block Index为此切片开始位置所在Block的Index
                         * 
                         * **********************************************************
                         */
                        int blkIndex = getBlockIndex(blkLocations, length
                                - bytesRemaining);
    
                        /*
                         * 一个Block对应一个FileSplit
                         * 
                         * *******************************************************
                         */
                        splits.add(new FileSplit(path, length - bytesRemaining,
                                splitSize, blkLocations[blkIndex].getHosts()));
    
                        bytesRemaining -= splitSize;
                    }
    
                    if (bytesRemaining != 0) {
                        /*
                         * 剩余的文件数据形成一个切片,hosts为此文件最后一个Block的hosts
                         * 
                         * **********************************************************
                         */
                        splits.add(new FileSplit(path, length - bytesRemaining,
                                bytesRemaining,
                                blkLocations[blkLocations.length - 1].getHosts()));
                    }
                } else if (length != 0) {
                    /*
                     * 文件长度不为0但不可分割
                     * 
                     * 不能切片的文件,整体形成一个切片,hosts为此文件第一个Block的hosts
                     * 
                     * ***********************************************************
                     */
                    splits.add(new FileSplit(path, 0, length, blkLocations[0]
                            .getHosts()));
                } else {
                    // Create empty hosts array for zero length files
                    splits.add(new FileSplit(path, 0, length, new String[0]));
                }
            }
    
            LOG.debug("Total # of splits: " + splits.size());
    
            return splits;
        }

    getSplits处理流程如下:

    ① 根据配置参数计算Split所允许的最小值与最大值,为后期确定Split的长度提供参考;

            /*
             * 计算Split的最小值与最大值
              * 
             * ********************************************************************
             */
            long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
            long maxSize = getMaxSplitSize(job);

    ② 在内存中创建相应的数据结构,用以保存计算所得的切片信息;

            // generate splits
            List<InputSplit> splits = new ArrayList<InputSplit>();

    ③ 循环处理InputPaths所添加的文件,对一个文件各自计算其对应的切片信息;

            /*
             * 逐个处理InputPaths中的文件
              * 
             * *******************************************************************
             */
            for (FileStatus file : listStatus(job)) {
                ......
            }

    ④ 计算某个文件的切片信息:

    a. 获取该文件的长度及对应的Block信息;

                Path path = file.getPath();
    
                FileSystem fs = path.getFileSystem(job.getConfiguration());
    
                /*
                 * 获取特定文件的长度
                  * 
                 * ******************************************************************
                 */
                long length = file.getLen();
    
                /*
                 * 获取特定文件对应的块Block信息
                  * 
                 * ***************************************************************
                 */
                BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
                        length);

    b. 根据文件长度以及该文件是否可以切片,分为三种情况处理:

    其中,文件是否支持Split,是由该文件类型对应的InputFormat决定的,FileInputFormat中的实现如下:

        /**
         * Is the given filename splitable? Usually, true, but if the file is stream
         * compressed, it will not be.
         * 
         * <code>FileInputFormat</code> implementations can override this and return
         * <code>false</code> to ensure that individual input files are never
         * split-up so that {@link Mapper}s process entire files.
         * 
         * @param context
         *            the job context
         * @param filename
         *            the file name to check
         * @return is this file splitable?
         */
        protected boolean isSplitable(JobContext context, Path filename) {
            return true;
        }

    TextInputFormat中重写了该方法:

    @Override
        protected boolean isSplitable(JobContext context, Path file) {
            CompressionCodec codec = new CompressionCodecFactory(
                    context.getConfiguration()).getCodec(file);
            
            return codec == null;
        }

    由上述代码可见,如果文本文件经过相应的压缩之后,是不支持进行Split的。

    第一种情况:文件长度大于0,且文件支持Split;

    首先计算一个切片的具体长度,长度的计算方式为:Math.max(minSize, Math.min(maxSize, blockSize) ;

                    long blockSize = file.getBlockSize();
    
                    /*
                     * 根据blockSize、minSize、maxSize计算切片大小
                       * 
                     * Math.max(minSize, Math.min(maxSize, blockSize)
                     * 
                     * ***********************************************************
                     */
                    long splitSize = computeSplitSize(blockSize, minSize, maxSize);

    然后,根据splitSize进行切片,思路就是从文件开始处,以splitSize为区间,对文件进行逻辑上的切分;

                    long bytesRemaining = length;
    
                    while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
                        /*
                         * 返回的Block Index为此切片开始位置所在Block的Index
                         * 
                         * **********************************************************
                         */
                        int blkIndex = getBlockIndex(blkLocations, length
                                - bytesRemaining);
    
                        /*
                         * 一个Block对应一个FileSplit
                         * 
                         * *******************************************************
                         */
                        splits.add(new FileSplit(path, length - bytesRemaining,
                                splitSize, blkLocations[blkIndex].getHosts()));
    
                        bytesRemaining -= splitSize;
                    }
    
                    if (bytesRemaining != 0) {
                        /*
                         * 剩余的文件数据形成一个切片,hosts为此文件最后一个Block的hosts
                         * 
                         * **********************************************************
                         */
                        splits.add(new FileSplit(path, length - bytesRemaining,
                                bytesRemaining,
                                blkLocations[blkLocations.length - 1].getHosts()));
                    }

    为了不产生过小的切片,要求尚未进行切片的文件部分长度(bytesRemaining)大于切片长度(splitSize)的SPLIT_SLOP(1.1)倍,然后将文件的剩余部分直接作为一个切片。

    在上述代码中的切片信息中,还保存着切片对应的Block信息,注意切片并不一定会与Block完全吻合(即切片在文件中的起止处与该Block在文件中的起止处一致),所谓的对应的Block,是指该切片的起始处正在落在该Block的区间内;之所以要保存切片对应的Block信息,是为后期Map任务的“本地计算”调度运行作准备的。

    第二种情况:文件长度大于0,但该文件不支持切片;

                    /*
                     * 文件长度不为0但不可分割
                       * 
                     * 不能切片的文件,整体形成一个切片,hosts为此文件第一个Block的hosts
                     * 
                     * ***********************************************************
                     */
                    splits.add(new FileSplit(path, 0, length, blkLocations[0]
                            .getHosts()));

    因为该文件不支持切片,直接将整个文件作为一个切片就可以了。

    第三种情况:文件长度为0;

                    // Create empty hosts array for zero length files
                    splits.add(new FileSplit(path, 0, length, new String[0]));

    此时直接创建一个空的切片即可。

    到此,所有输入文件的切片信息就全部产生完毕了。

    ⑤ 对产生的切片进行排序处理,排序的依据是切片的大小,切片越大,在切片集合中的位置应该更靠前,这样可以使大的切片在调度时,优先得到处理。

            T[] array = (T[]) splits
                    .toArray(new org.apache.hadoop.mapreduce.InputSplit[splits
                            .size()]);
    
            /*
             * sort the splits into order based on size, so that the biggest go
             * first
             * 
             * ******************************************************************
             */
            Arrays.sort(array, new NewSplitComparator());

    ⑥ 存储切片信息至相应的切片文件中,调度任务时使用切片文件中的信息进行调度,具体的存储过程不影响整个处理流程的理解,在此不对它进行分析。

    至此,writeNewSplits方法结果,该方法还回返回切片的总数目,即对应着Job的Map任务数目。

    (7)将Job的相关信息写入job.xml;

            /*
             * Write job file to JobTracker's fs
             * 
             * **********************************************************************
             */
            FSDataOutputStream out = FileSystem.create(fs, submitJobFile,
                    new FsPermission(JOB_FILE_PERMISSION));
    
            try {
                job.writeXml(out);
            } finally {
                out.close();
            }

    (8)完成Job任务的实际提交;

            /*
             * Now, actually submit the job (using the submit name)
             * 
             * **********************************************************************
             */
            JobStatus status = jobSubmitClient.submitJob(jobId);
    
            if (status != null) {
                return new NetworkedJob(status);
            } else {
                throw new IOException("Could not launch job");
            }

    到此,Job任务的提交过程分析完毕。

  • 相关阅读:
    多层装饰器执行顺序
    flask之 中间件 蓝图 falsk请求上下文 rquirements.txt threading.local 偏函数
    flask 之 在flask中使用websocket
    flask 之项目分文件使用sqlalchemy+flask-migrate djagno多数据库
    flask之六 sqlachemy详解 scoped_session线程安全 基本增删改查 多对多关系建立和操作 flask-sqlalchemy的使用
    远程连接linux开发项目
    INT104-lab9
    INT104-lab8
    INT104-lab7
    Java-数据结构-泛型BST-CPT102-tutorial Week6
  • 原文地址:https://www.cnblogs.com/yurunmiao/p/3557076.html
Copyright © 2011-2022 走看看