zoukankan      html  css  js  c++  java
  • Mapreduce运行过程分析(基于Hadoop2.4)——(一)

    1 概述

    该瞅瞅MapReduce的内部执行原理了,曾经仅仅知道个皮毛,再不搞搞,不然怎么死的都不晓得。下文会以2.4版本号中的WordCount这个经典样例作为分析的切入点。一步步来看里面究竟是个什么情况。

    2 为什么要使用MapReduce

    Map/Reduce。是一种模式,适合解决并行计算的问题,比方TopN、贝叶斯分类等。

    注意。是并行计算,而非迭代计算,像涉及到层次聚类的问题就不太适合了。

    从名字能够看出,这样的模式有两个步骤,Map和Reduce。

    Map即数据的映射,用于把一组键值对映射成还有一组新的键值对,而Reduce这个东东,以Map阶段的输出结果作为输入。对数据做化简、合并等操作。

    而MapReduce是Hadoop生态系统中基于底层HDFS的一个计算框架,它的上层又能够是Hive、Pig等数据仓库框架。也能够是Mahout这种数据挖掘工具。

    因为MapReduce依赖于HDFS。其运算过程中的数据等会保存到HDFS上,把对数据集的计算分发给各个节点,并将结果进行汇总,再加上各种状态汇报、心跳汇报等。其仅仅适合做离线计算。和实时计算框架Storm、Spark等相比,速度上没有优势。

    旧的Hadoop生态差点儿是以MapReduce为核心的,可是慢慢的发展,其扩展性差、资源利用率低、可靠性等问题都越来越让人认为不爽,于是才产生了Yarn这个新的东东,而且二代版的Hadoop生态都是以Yarn为核心。

    Storm、Spark等都能够基于Yarn使用。

    3 怎么执行MapReduce

    明确了哪些地方能够使用这个牛叉的MapReduce框架,那该怎么用呢?Hadoop的MapReduce源代码给我们提供了范例,在其hadoop-mapreduce-examples子project中包括了MapReduce的Java版样例。在写完类似的代码后。打包成jar。在HDFS的client执行:

    bin/hadoop jar mapreduce_examples.jar mainClass args

    就可以。当然。也能够在IDE(如Eclipse)中。进行远程执行、调试程序。

    至于,HadoopStreaming方式。网上有非常多。我们这里仅仅讨论Java的实现。

    4 怎样编写MapReduce程序

        如前文所说。MapReduce中有Map和Reduce。在实现MapReduce的过程中,主要分为这两个阶段,分别以两类函数进行展现。一个是map函数。一个是reduce函数。map函数的參数是一个<key,value>键值对,其输出结果也是键值对,reduce函数以map的输出作为输入进行处理。

    4.1 代码构成

        实际的代码中。须要三个元素。各自是Map、Reduce、执行任务的代码。这里的Map类是继承了org.apache.hadoop.mapreduce.Mapper,并实现当中的map方法;而Reduce类是继承了org.apache.hadoop.mapreduce.Reducer,实现当中的reduce方法。

    至于执行任务的代码,就是我们程序的入口。

        以下是Hadoop提供的WordCount源代码。

    /**
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package org.apache.hadoop.examples;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class WordCount {
    
      public static class TokenizerMapper 
           extends Mapper<Object, Text, Text, IntWritable>{
        
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
          
        public void map(Object key, Text value, Context context
                        ) throws IOException, InterruptedException {
          StringTokenizer itr = new StringTokenizer(value.toString());
          while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, one);
          }
        }
      }
      
      public static class IntSumReducer 
           extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();
    
        public void reduce(Text key, Iterable<IntWritable> values, 
                           Context context
                           ) throws IOException, InterruptedException {
          int sum = 0;
          for (IntWritable val : values) {
            sum += val.get();
          }
          result.set(sum);
          context.write(key, result);
        }
      }
    
      public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
          System.err.println("Usage: wordcount <in> <out>");
          System.exit(2);
        }
        Job job = new Job(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
      }
    }
    


    4.2 入口类

    4.2.1 參数获取

    首先定义配置文件类Configuration。此类是Hadoop各个模块的公共使用类,用于载入类路径下的各种配置文件。读写当中的配置选项。

        第二步中,用到了GenericOptionsParser类,其目的是将命令行中參数自己主动设置到变量conf中。

        GenericOptionsParser的构造方法进去之后。会进行到parseGeneralOptions,对传入的參数进行解析:

     1 private void parseGeneralOptions(Options opts, Configuration conf,
     2 
     3       String[] args) throws IOException {
     4 
     5     opts = buildGeneralOptions(opts);
     6 
     7     CommandLineParser parser = new GnuParser();
     8 
     9     try {
    10 
    11       commandLine = parser.parse(opts, preProcessForWindows(args), true);
    12 
    13       processGeneralOptions(conf, commandLine);
    14 
    15     } catch(ParseException e) {
    16 
    17       LOG.warn("options parsing failed: "+e.getMessage());
    18 
    19  
    20 
    21       HelpFormatter formatter = new HelpFormatter();
    22 
    23       formatter.printHelp("general options are: ", opts);
    24 
    25     }
    26 
    27   }

     

       而getRemainingArgs方法会获得传入的參数,接着在main方法中会进行推断參数的个数,因为此处是WordCount计算。仅仅须要传入文件的输入路径和输出路径就可以,因此參数的个数为2。否则将退出:

    1 if (otherArgs.length != 2) {
    2 
    3       System.err.println("Usage: wordcount <in> <out>");
    4 
    5       System.exit(2);
    6 
    7 }

     

    假设在代码执行的时候传入其它的參数,比方指定reduce的个数。能够依据GenericOptionsParser的命令行格式这么写:

    bin/hadoop jar MyJob.jar com.xxx.MyJobDriver -Dmapred.reduce.tasks=5

    其规则是-D加MapReduce的配置选项,当然还支持-fs等其它參数传入

    当然。默认情况下Reduce的数目为1。Map的数目也为1

    4.2.2 Job定义

       定义Job对象,其构造方法为:

    1 public Job(Configuration conf, String jobName) throws IOException {
    2 
    3     this(conf);
    4 
    5     setJobName(jobName);
    6 
    7   }

     

    可见,传入的"word count"就是Job的名字。而conf被传递给了JobConf进行环境变量的获取:

     1 public JobConf(Configuration conf) {
     2 
     3     super(conf);    
     6 
     7     if (conf instanceof JobConf) {
     8 
     9       JobConf that = (JobConf)conf;
    10 
    11       credentials = that.credentials;
    12 
    13     }
    14      checkAndWarnDeprecation(); 
    19   }

     

        Job已经实例化了,以下就得给这个Job加点佐料才干让它依照我们的要求执行。于是依次给Job加入启动Jar包、设置Mapper类、设置合并类、设置Reducer类、设置输出键类型、设置输出值的类型。

        这里有必要说下设置Jar包的这种方法setJarByClass:

    1 public void setJarByClass(Class<?> cls) {
    2 
    3     ensureState(JobState.DEFINE);
    4 
    5     conf.setJarByClass(cls);
    6 
    7   }

     

    它会首先推断当前Job的状态是否是执行中,接着通过class找到其所属的jar文件,将jar路径赋值给mapreduce.job.jar属性。至于寻找jar文件的方法,则是通过classloader获取类路径下的资源文件。进行循环遍历。

    详细实现见ClassUtil类中的findContainingJar方法。

        搞完了上面的东西。紧接着就会给mapreduce.input.fileinputformat.inputdir參数赋值,这是Job的输入路径,还有mapreduce.input.fileinputformat.inputdir,这是Job的输出路径。

    详细的位置。就是我们前面main中传入的Args。

    4.2.3 Job提交

        万事俱备。那就执行吧。

        这里调用的方法例如以下:

     1 public boolean waitForCompletion(boolean verbose
     2 
     3                                    ) throws IOException, InterruptedException,
     4 
     5                                             ClassNotFoundException {
     6 
     7     if (state == JobState.DEFINE) {
     8 
     9       submit();
    10 
    11     }
    12 
    13     if (verbose) {
    14 
    15       monitorAndPrintJob();
    16 
    17     } else {
    18 
    19       // get the completion poll interval from the client.
    20 
    21       int completionPollIntervalMillis =
    22 
    23         Job.getCompletionPollInterval(cluster.getConf());
    24 
    25       while (!isComplete()) {
    26 
    27         try {
    28 
    29           Thread.sleep(completionPollIntervalMillis);
    30 
    31         } catch (InterruptedException ie) {
    32 
    33         }
    34 
    35       }
    36 
    37     }
    38 
    39     return isSuccessful();
    40 
    41   }

     

    至于方法的參数verbose,假设想在控制台打印当前的进度,则设置为true。

       至于submit方法,假设当前在HDFS的配置文件里配置了mapreduce.framework.name属性为“yarn”的话,会创建一个YARNRunner对象来进行任务的提交。其构造方法例如以下:

     1 public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
     2 
     3       ClientCache clientCache) {
     4 
     5     this.conf = conf;
     6 
     7     try {
     8 
     9       this.resMgrDelegate = resMgrDelegate;
    10 
    11       this.clientCache = clientCache;
    12 
    13       this.defaultFileContext = FileContext.getFileContext(this.conf);
    14 
    15     } catch (UnsupportedFileSystemException ufe) {
    16 
    17       throw new RuntimeException("Error in instantiating YarnClient", ufe);
    18 
    19     }
    20 
    21   }

     

    当中。ResourceMgrDelegate实际上ResourceManager的代理类。事实上现了YarnClient接口,通过ApplicationClientProtocol代理直接向RM提交Job,杀死Job,查看Job执行状态等操作。

    同一时候,在ResourceMgrDelegate类中会通过YarnConfiguration来读取yarn-site.xml、core-site.xml等配置文件里的配置属性。

       以下就到了client最关键的时刻了,提交Job到集群执行。详细实现类是JobSubmitter类中的submitJobInternal方法。

    这个牛气哄哄的方法写了100多行。还不算其几十行的凝视。我们看它干了点啥。

    Step1:

    检查job的输出路径是否存在,假设存在则抛出异常。

    Step2:

    初始化用于存放Job相关资源的路径。注意此路径的构造方式为:

    1 conf.get(MRJobConfig.MR_AM_STAGING_DIR,
    2 
    3         MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
    4 
    5         + Path.SEPARATOR + user
    6 
    7 + Path.SEPARATOR + STAGING_CONSTANT

     

    当中。MRJobConfig.DEFAULT_MR_AM_STAGING_DIR为“/tmp/hadoop-yarn/staging”。STAGING_CONSTANT为".staging"。

    Step3:

    设置client的host属性:mapreduce.job.submithostname和mapreduce.job.submithostaddress

    Step4:

    通过RPC,向Yarn的ResourceManager申请JobID对象。

    Step5:

    从HDFS的NameNode获取验证用的Token,并将其放入缓存。

    Step6:

    将作业文件上传到HDFS。这里假设我们前面没有对Job命名的话,默认的名称就会在这里设置成jar的名字。而且,作业默认的副本数是10,假设属性mapreduce.client.submit.file.replication没有被设置的话。

    Step7:

    文件上传到HDFS之后,还要被DistributedCache进行缓存起来。这是由于计算节点收到该作业的第一个任务后,就会有DistributedCache自己主动将作业文件Cache到节点本地文件夹下。而且会对压缩文件进行解压,如:.zip。.jar。.tar等等。然后開始任务。

    最后,对于同一个计算节点接下来收到的任务。DistributedCache不会反复去下载作业文件,而是直接执行任务。

    假设一个作业的任务数非常多,这样的设计避免了在同一个节点上对用一个job的文件会下载多次。大大提高了任务执行的效率。

    Step8:

    对每一个输入文件进行split划分。

    注意这仅仅是个逻辑的划分,不是物理的。由于此处是输入文件,因此运行的是FileInputFormat类中的getSplits方法。仅仅有非压缩的文件和几种特定压缩方式压缩后的文件才分片。分片的大小由例如以下几个參数决定:mapreduce.input.fileinputformat.split.maxsizemapreduce.input.fileinputformat.split.minsize、文件的块大小。

    详细计算方式为:

    Math.max(minSize, Math.min(maxSize, blockSize))

    分片的大小有可能比默认块大小64M要大。当然也有可能小于它。默认情况下分片大小为当前HDFS的块大小,64M。

       接下来就该正儿八经的获取分片详情了。

    代码例如以下:

     1           long bytesRemaining = length; 2 
     3           while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
     4 
     5             int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
     6 
     7             splits.add(makeSplit(path, length-bytesRemaining, splitSize, 
     9                                      blkLocations[blkIndex].getHosts()));
    10 
    11             bytesRemaining -= splitSize; 
    13           }  
    15 
    16           if (bytesRemaining != 0) { 
    18             int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
    19 
    20             splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, 
    22                        blkLocations[blkIndex].getHosts()));
    23 
    24           }

     

    Step8.1:

       将bytesRemaining(剩余未分片字节数)设置为整个文件的长度。

    Step8.2:

    假设bytesRemaining超过分片大小splitSize一定量才会将文件分成多个InputSplit,SPLIT_SLOP(默认1.1)。接着就会运行例如以下方法获取block的索引。当中第二个參数是这个block在整个文件里的偏移量,在循环中会从0越来越大:

     1 protected int getBlockIndex(BlockLocation[] blkLocations, long offset) {
     4     for (int i = 0 ; i < blkLocations.length; i++) {
     5       // is the offset inside this block?
     6       if ((blkLocations[i].getOffset() <= offset) &&
     7           (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
     8         return i;
     9       }
    10     }
    11 
    12     BlockLocation last = blkLocations[blkLocations.length -1];
    13     long fileLength = last.getOffset() + last.getLength() -1;
    14     throw new IllegalArgumentException("Offset " + offset + " is outside of file (0.." + fileLength + ")");
    17   }

     

    将符合条件的块的索引相应的block信息的主机节点以及文件的路径名、開始的偏移量、分片大小splitSize封装到一个InputSplit中增加List<InputSplit> splits。

    Step8.3:

    bytesRemaining -= splitSize改动剩余字节大小。剩余假设bytesRemaining还不为0,表示还有未分配的数据,将剩余的数据及最后一个block增加splits。

    Step8.4

    假设不同意切割isSplitable==false,则将第一个block、文件文件夹、開始位置为0,长度为整个文件的长度封装到一个InputSplit,增加splits中;假设文件的长度==0,则splits.add(new FileSplit(path, 0, length, new String[0]))没有block。而且初始和长度都为0。

    Step8.5

    将输入文件夹下文件的个数赋值给 "mapreduce.input.num.files",方便以后校对,返回分片信息splits。

      这就是getSplits获取分片的过程。

    当使用基于FileInputFormat实现InputFormat时。为了提高MapTask的数据本地性,应尽量使InputSplit大小与block大小同样。

      假设分片大小超过bolck大小,可是InputSplit中的封装了单个block的所在主机信息啊,这样能读取多个bolck数据吗?

    比方当前文件非常大,1G。我们设置的最小分片是100M,最大是200M,当前块大小为64M。经过计算后的实际分片大小是100M,这个时候第二个分片中存放的也仅仅是一个block的host信息。

    须要注意的是split是逻辑分片。不是物理分片,当Map任务须要的数据本地性发挥作用时,会从本机的block開始读取。超过这个block的部分可能不在本机,这就须要从别的DataNode拉数据过来,由于实际获取数据是一个输入流,这个输入流面向的是整个文件,不受split的影响,split的大小越大可能须要从别的节点拉的数据越多。从从而效率也会越慢,拉数据的多少是由getSplits方法中的splitSize决定的。所以为了更有效率,分片的大小尽量保持在一个block大小吧。

    Step9:

    将split信息和SplitMetaInfo都写入HDFS中。用法:

    1 JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);

     

    Step10:

    对Map数目设置,上面获得到的split的个数就是实际的Map任务的数目。

    Step11:

    相关配置写入到job.xml中:

    1 jobCopy.writeXml(out);

     

    Step12:

    通过例如以下代码正式提交Job到Yarn:

    1 status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

     

       这里就涉及到YarnClient和RresourceManager的RPC通信了。

    包含获取applicationId、进行状态检查、网络通信等。

    Step13:

    上面通过RPC的调用。最后会返回一个JobStatus对象,它的toString方法能够在JobClient端打印执行的相关日志信息。

    4.2.4 还有一种执行方式

       提交MapReduce任务的方式除了上述源代码中给出的之外。还能够使用ToolRunner方式。详细方式为:

    1 ToolRunner.run(new Configuration(),new WordCount(), args); 

     

    至此。我们的MapReduce的启动类要做的事情已经分析完了。

  • 相关阅读:
    webpack-bundle-analyzer使用
    HTTP1.0,HTTP1.1和HTTP2.0区别
    document.readyState
    async和defer
    页面生命周期
    key的理解
    解释型语言和编译型语言
    AMD/CMD/CommonJS与ES6 Module的区别
    vue的keep-alive原理
    数字钱包metaplex-foundation
  • 原文地址:https://www.cnblogs.com/zfyouxi/p/5255299.html
Copyright © 2011-2022 走看看