zoukankan      html  css  js  c++  java
  • Hadoop的数据输入的源码解析

      我们知道,任何一个工程项目,最重要的是三个部分:输入,中间处理,输出。今天我们来深入的了解一下我们熟知的Hadoop系统中,输入是如何输入的?

      在hadoop中,输入数据都是通过对应的InputFormat类和RecordReader类来实现的,其中InputFormat来实现将对应输入文件进行分片,RecordReader类将对应分片中的数据读取进来。具体的方式如下:

    (1)InputFormat类是一个接口。

    public interface InputFormat<K, V> {

     

      InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;

     

      RecordReader<K, V> getRecordReader(InputSplit split,

                                         JobConf job,

                                         Reporter reporter) throws IOException;

    }

    (2)FileInputFormat类实现了InputFormat接口。该类实现了getSplits方法,但是它也没有实现对应的getRecordReader方法。也就是说FileInputFormat还是一个抽象类。这里需要说明的一个问题是,FileInputFormat用isSplitable方法来指定对应的文件是否支持数据的切分。默认情况下都是支持的,一般子类都需要重新实现它。

    public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {

      public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {

        FileStatus[] files = listStatus(job);

        // Save the number of input files in the job-conf

        job.setLong(NUM_INPUT_FILES, files.length);

        long totalSize = 0;                           // compute total size

        for (FileStatus file: files) {                // check we have valid files

          if (file.isDir()) {

            throw new IOException("Not a file: "+ file.getPath());

          }

          totalSize += file.getLen();

        }

        long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);

        long minSize = Math.max(job.getLong("mapred.min.split.size", 1),

                                minSplitSize);

        // generate splits

        ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);

        NetworkTopology clusterMap = new NetworkTopology();

        for (FileStatus file: files) {

          Path path = file.getPath();

          FileSystem fs = path.getFileSystem(job);

          long length = file.getLen();

          BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);

          if ((length != 0) && isSplitable(fs, path)) {

            long blockSize = file.getBlockSize();

            long splitSize = computeSplitSize(goalSize, minSize, blockSize);

     

            long bytesRemaining = length;

            while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {

              String[] splitHosts = getSplitHosts(blkLocations,

                  length-bytesRemaining, splitSize, clusterMap);

              splits.add(new FileSplit(path, length-bytesRemaining, splitSize,

                  splitHosts));

              bytesRemaining -= splitSize;

            }

            if (bytesRemaining != 0) {

              splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,

                         blkLocations[blkLocations.length-1].getHosts()));

            }

          } else if (length != 0) {

            String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);

            splits.add(new FileSplit(path, 0, length, splitHosts));

          } else {

            //Create empty hosts array for zero length files

            splits.add(new FileSplit(path, 0, length, new String[0]));

          }

        }

        LOG.debug("Total # of splits: " + splits.size());

        return splits.toArray(new FileSplit[splits.size()]);

      }

     

    //该方法是用来判断是否可以进行数据的切分

      protected boolean isSplitable(FileSystem fs, Path filename) {

        return true;

      }

    //但是它也没有实现对应的getRecordReader方法。也就是说FileInputFormat还是一个抽象类。

      public abstract RecordReader<K, V> getRecordReader(InputSplit split,

                                                   JobConf job,

                                                   Reporter reporter)  throws IOException;

     }

    (3)TextFileInputFormat类仅仅实现了FileInputFormat类的getRecordReader方法,并且重写了isSplitable方法,他并没有实现getSplits方法,由此可知,他的getSplits的实现还是交由父类FileInputFormat来实现的。(这里需要注意TextFileInputFormat并不是InputFormat的子类,TextFileInputFormat它仅仅是继承了InputFormat的getRecordReader的方法而已。)

    public class TextInputFormat extends FileInputFormat<LongWritable, Text>

      implements JobConfigurable {

     

      private CompressionCodecFactory compressionCodecs = null;

     

      public void configure(JobConf conf) {

        compressionCodecs = new CompressionCodecFactory(conf);

      }

     

      //子类重新实现了isSplitable方法

      protected boolean isSplitable(FileSystem fs, Path file) {

        final CompressionCodec codec = compressionCodecs.getCodec(file);

        if (null == codec) {

          return true;

        }

        return codec instanceof SplittableCompressionCodec;

      }

     //该方法实现了将文件中的数据读入到对应的Map方法中。

      public RecordReader<LongWritable, Text> getRecordReader(

                                              InputSplit genericSplit, JobConf job,

                                              Reporter reporter)

        throws IOException {

       

        reporter.setStatus(genericSplit.toString());

        String delimiter = job.get("textinputformat.record.delimiter");

        byte[] recordDelimiterBytes = null;

        if (null != delimiter) recordDelimiterBytes = delimiter.getBytes();

        return new LineRecordReader(job, (FileSplit) genericSplit,

            recordDelimiterBytes);

      }

    }

     

    从上面可以看出一个Text格式的文件是通过什么样的类继承层次输入到map方法中。下面主要介绍一下,到底是如何切分的?我们从类的继承层次关系上可以看出,具体的切分方式是通过FileInputFormat类来实现的。因此,要了解文件是如何切分的,只需要查看一下FileInputFormat类中的getSplits方法的实现细节即可。下面我再次把FileInputFormat类中的getSplits方法贴出来:然后分析每一句代码。

      public InputSplit[] getSplits(JobConf job, int numSplits)

        throws IOException {

        FileStatus[] files = listStatus(job); //列出当前job中所有的输入文件

       

        // Save the number of input files in the job-conf

    job.setLong(NUM_INPUT_FILES, files.length); //设置当前job的输入文件数目

     

    //计算当前job所有输入文件总的大小

    long totalSize = 0;                           // compute total size

    //遍历每一个文件

        for (FileStatus file: files) {               // check we have valid files

          if (file.isDir()) {

            throw new IOException("Not a file: "+ file.getPath());

          }

          totalSize += file.getLen();

        }

    // numSplits是分片数,goalSize是平均每一个分片的大小,minSize是每个分片最小值

        long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);

        long minSize = Math.max(job.getLong("mapred.min.split.size", 1),

                                minSplitSize);

     

        // generate splits  计算分片

        ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);

        NetworkTopology clusterMap = new NetworkTopology();

        for (FileStatus file: files) {

          Path path = file.getPath();

          FileSystem fs = path.getFileSystem(job);

          long length = file.getLen();

        //获取文件的位置

          BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);

      //isSplitable方法根据对应文件名称判断对应文件是否可以切分

          if ((length != 0) && isSplitable(fs, path)) {

            long blockSize = file.getBlockSize();//获取文件块的大小

            // computeSplitSize方法计算真正的分片大小

            long splitSize = computeSplitSize(goalSize, minSize, blockSize);

     

            long bytesRemaining = length;//文件剩余大小

     

        // SPLIT_SLOP=1.1,文件大小/分片的大小> SPLIT_SLOP则进行切分。

            while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {

            // splitHosts用来记录分片元数据信息(包括切片的位置,大小等等)

              String[] splitHosts = getSplitHosts(blkLocations,

                  length-bytesRemaining, splitSize, clusterMap);

              splits.add(new FileSplit(path, length-bytesRemaining, splitSize,

                  splitHosts));

              bytesRemaining -= splitSize;

            }

           

            if (bytesRemaining != 0) {

              splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,

                         blkLocations[blkLocations.length-1].getHosts()));

            }

          } else if (length != 0) { //如果文件不能切分,相应的会将整个文件作为一个分片。

            String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);

            splits.add(new FileSplit(path, 0, length, splitHosts));

          } else {

            //Create empty hosts array for zero length files

            splits.add(new FileSplit(path, 0, length, new String[0]));

          }

        }

        LOG.debug("Total # of splits: " + splits.size());

        return splits.toArray(new FileSplit[splits.size()]);

      }

     

    //真正计算分片大小的地方。

    protected long computeSplitSize(long goalSize, long minSize,

                                           long blockSize) {

        return Math.max(minSize, Math.min(goalSize, blockSize));

      }

     

    综上所述,对于MR的输入文件采用的方式是通过FileInputFormat类来进行数据的切分,在切分之前,是通过isSplitable方法来判断是否可以切分,若不能切分,则会将整个文件作为一个分片作为输入。因此,若有业务需求需要对应文件不能进行切分的话,可以将isSplitable方法方位false即可。

    这里还需要注意一个问题,倘若你的文件都是小文件的话,对应的getSplits方法也不会对其进行切分的。一般情况小文件指的是其大小小于对应hadoop中HDFS的块的大小(128M)。

     

     

  • 相关阅读:
    安防教育直播项目应用中RTSPSever组件libEasyRTSPServer编译arm版本报undefined reference to错误问题排查
    RTMP/RTSP/GB28181等协议TSINGSEE青犀能力开放平台汇总
    “Visual Studio.net已检测到指定的Web服务器运行的不是Asp.net1.1版。您将无法运行Asp.net Web应用程序或服务”问题的解决方案
    四大图像库的使用感受:OpenCV/FreeImage/CImg/CxImage
    我做的Java课件——http://sit.uibe.edu.cn/java/
    ARP协议工作原理
    ARP欺骗源码(基于WinPcap实现)
    多线程调用DBUS服务注意事项
    汉译英教程
    IIS5.1安装步骤及测试
  • 原文地址:https://www.cnblogs.com/ljy2013/p/5083310.html
Copyright © 2011-2022 走看看