zoukankan      html  css  js  c++  java
  • hadoop输入分片计算(Map Task个数的确定)

      作业从JobClient端的submitJobInternal()方法提交作业的同时,调用InputFormat接口的getSplits()方法来创建split。默认是使用InputFormat的子类FileInputFormat来计算分片,而split的默认实现为FileSplit(其父接口为InputSplit)。这里要注意,split只是逻辑上的概念,并不对文件做实际的切分。一个split记录了一个Map Task要处理的文件区间,所以分片要记录其对应的文件偏移量以及长度等。每个split由一个Map Task来处理,所以有多少split,就有多少Map Task。下面着重分析这个方法:

     1 public List<InputSplit> getSplits(JobContext job
     2                                     ) throws IOException {
     3     //getFormatMinSplitSize():始终返回1
     4     //getMinSplitSize(job):获取” mapred.min.split.size”的值,默认为1
     5     long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
     6     
     7     //getMaxSplitSize(job):获取"mapred.max.split.size"的值,
     8     //默认配置文件中并没有这一项,所以其默认值为” Long.MAX_VALUE”,即2^63 – 1
     9     long maxSize = getMaxSplitSize(job);
    10 
    11     // generate splits
    12     List<InputSplit> splits = new ArrayList<InputSplit>();
    13     List<FileStatus>files = listStatus(job);
    14     for (FileStatus file: files) {
    15       Path path = file.getPath();
    16       FileSystem fs = path.getFileSystem(job.getConfiguration());
    17       long length = file.getLen();
    18       BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
    19       if ((length != 0) && isSplitable(job, path)) { 
    20         long blockSize = file.getBlockSize();
    21         //计算split大小
    22         long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    23 
    24         //计算split个数
    25         long bytesRemaining = length;    //bytesRemaining表示剩余字节数
    26         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //SPLIT_SLOP=1.1
    27           int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
    28           splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
    29                                    blkLocations[blkIndex].getHosts()));
    30           bytesRemaining -= splitSize;
    31         }
    32         
    33         if (bytesRemaining != 0) {
    34           splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 
    35                      blkLocations[blkLocations.length-1].getHosts()));
    36         }
    37       } else if (length != 0) {
    38         splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
    39       } else { 
    40         //Create empty hosts array for zero length files
    41         splits.add(new FileSplit(path, 0, length, new String[0]));
    42       }
    43     }
    44     
    45     // Save the number of input files in the job-conf
    46     job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    47 
    48     LOG.debug("Total # of splits: " + splits.size());
    49     return splits;
    50   }

      首先计算分片的下限和上限:minSize和maxSize,具体的过程在注释中已经说清楚了。接下来用这两个值再加上blockSize来计算实际的split大小,过程也很简单,具体代码如下:

    1 protected long computeSplitSize(long blockSize, long minSize,
    2                                   long maxSize) {
    3     return Math.max(minSize, Math.min(maxSize, blockSize));
    4   }

      接下来就是计算实际的分片个数了。针对每个输入文件,计算input split的个数。while循环的含义如下:

      a)  文件剩余字节数/splitSize>1.1,创建一个split,这个split的字节数=splitSize,文件剩余字节数=文件大小 - splitSize

      b)  文件剩余字节数/splitSize<1.1,剩余的部分全都作为一个split(这主要是考虑到,不用为剩余的很少的字节数一些启动一个Map Task)

      

      我们发现,在默认配置下,split大小和block大小是相同的。这是不是为了防止这种情况:

    一个split如果对应的多个block,若这些block大多不在本地,则会降低Map Task的本地性,降低效率。

      到这里split的划分就介绍完了,但是有两个问题需要考虑:

    1、如果一个record跨越了两个block该怎么办?

      这个可以看到,在Map Task读取block的时候,每次是读取一行的,如果发现块的开头不是上一个文件的结束,那么抛弃第一条record,因为这个record会被上一个block对应的Map Task来处理。那么,第二个问题来了:

    2、上一个block对应的Map Task并没有最后一条完整的record,它又该怎么办?

      一般来说,Map Task在读block的时候都会多读后续的几个block,以处理上面的这种情况。不过这部分的代码我还没有看到,等看到了再补充吧。

      本文基于hadoop1.2.1

      如有错误,还请指正

      参考文章:《Hadoop技术内幕 深入理解MapReduce架构设计与实现原理》 董西成

      转载请注明出处:http://www.cnblogs.com/gwgyk/p/4113929.html

     

  • 相关阅读:
    随机森林算法参数调优
    BAYES和朴素BAYES
    阿里云 金融接口 token PHP
    PHP mysql 按时间分组 表格table 跨度 rowspan
    MySql按周,按月,按日分组统计数据
    PHP 获取今日、昨日、本周、上周、本月的等等常用的起始时间戳和结束时间戳的时间处理类
    thinkphp5 tp5 会话控制 session 登录 退出 检查检验登录 判断是否应该跳转到上次url
    微信 模板消息
    php 腾讯 地图 api 计算 坐标 两点 距离 微信 网页 WebService API
    php添加http头禁止浏览器缓存
  • 原文地址:https://www.cnblogs.com/gwgyk/p/4113929.html
Copyright © 2011-2022 走看看