zoukankan      html  css  js  c++  java
  • Hadoop源码篇--Client源码

    一。前述

    今天起剖析源码,先从Client看起,因为Client在MapReduce的过程中承担了很多重要的角色。

    二。MapReduce框架主类

    代码如下:

    public static void main(String[] args) throws Exception {
            
            Configuration conf = new Configuration(true);
            //job  作业
            Job  job = Job.getInstance(conf);
            
             // Create a new Job
    //         Job job = Job.getInstance();
             job.setJarByClass(MyWC.class);
             
             // Specify various job-specific parameters     
             job.setJobName("myjob");
             
    //         job.setInputPath(new Path("in"));
    //         job.setOutputPath(new Path("out"));
             
             Path input = new Path("/user/root");
            FileInputFormat.addInputPath(job, input );
             
             Path output = new Path("/output/wordcount");
             if(output.getFileSystem(conf).exists(output)){
                 output.getFileSystem(conf).delete(output,true);
             }
            FileOutputFormat.setOutputPath(job, output );
             
             
             
             
             job.setMapperClass(MyMapper.class);
             job.setMapOutputKeyClass(Text.class);
             job.setMapOutputValueClass(IntWritable.class);
             job.setReducerClass(MyReducer.class);
    
             // Submit the job, then poll for progress until the job is complete
             job.waitForCompletion(true);

    第一步,先分析Job,可以看见源码中Job实现了public class Job extends JobContextImpl implements JobContext

    然后JobContext实现了 MRJobConfig,可以看见其中有很多配置

    因为job中传的参数为conf,所以这里的配置即对应我们的配置文件中的属性值。

      Job  job = Job.getInstance(conf);

     挑几个重要的看下:

    public static final int DEFAULT_MAP_MEMORY_MB = 1024;//默认的Mapper任务内存大小。

    第二步,分析提交过程 job.waitForCompletion(true);   追踪源码发现主要实现这个类


    JobStatus submitJobInternal(Job job, Cluster cluster)
      throws ClassNotFoundException, InterruptedException, IOException
    1. Checking the input and output specifications of the job.//检查输入输出路径
    2. Computing the InputSplits for the job.//检查切片
    3. Setup the requisite accounting information for the DistributedCache of the job, if necessary.
    4. Copying the job's jar and configuration to the map-reduce system directory on the distributed file-system.
    5. Submitting the job to the JobTracker and optionally monitoring it's status.

          在此方法中,中重点看下此方法 int maps = writeSplits(job, submitJobDir);

    追踪后具体实现可知

    private <T extends InputSplit>
      int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
          InterruptedException, ClassNotFoundException {
        Configuration conf = job.getConfiguration();
        InputFormat<?, ?> input =
          ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
    
        List<InputSplit> splits = input.getSplits(job);
        T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
    
        // sort the splits into order based on size, so that the biggest
        // go first
        Arrays.sort(array, new SplitComparator());
        JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
            jobSubmitDir.getFileSystem(conf), array);
        return array.length;
      }


    追踪job.getInputFormatClass()可以发现如下代码: 

    public Class<? extends InputFormat<?,?>> getInputFormatClass() throws ClassNotFoundException { return (Class<? extends InputFormat<?,?>>) conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
    //根据用户配置文件首先取用,如果没有被取用则使用默认输入格式TextInputFormat
    }

    所以可得知用户的默认输入类是TextInputformat类并且继承关系如下:

    TextInputforMat-->FileinputFormat-->InputFormat

     追踪 List<InputSplit> splits = input.getSplits(job);可以得到如下源码:

    最为重要的一个源码!!!!!!!!!!!

    public List<InputSplit> getSplits(JobContext job) throws IOException {
        Stopwatch sw = new Stopwatch().start();
        long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));如果用户设置则取用户,没有是1
        long maxSize = getMaxSplitSize(job);//如果用户设置则取用户,没有取最大值
    
        // generate splits
        List<InputSplit> splits = new ArrayList<InputSplit>();
        List<FileStatus> files = listStatus(job);
        for (FileStatus file: files) {
          Path path = file.getPath();//取输入文件的大小和路径
          long length = file.getLen();
          if (length != 0) {
            BlockLocation[] blkLocations;
            if (file instanceof LocatedFileStatus) {
              blkLocations = ((LocatedFileStatus) file).getBlockLocations();
            } else {
              FileSystem fs = path.getFileSystem(job.getConfiguration());
              blkLocations = fs.getFileBlockLocations(file, 0, length);//获得所有块的位置。
            }
            if (isSplitable(job, path)) {
              long blockSize = file.getBlockSize();
              long splitSize = computeSplitSize(blockSize, minSize, maxSize);//获得切片大小
    
              long bytesRemaining = length;
              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);//这一块传参传的是切块的偏移量,返回这个块的索引
                splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                            blkLocations[blkIndex].getHosts(),//根据当前块的索引号取出来块的位置包括副本的位置 然后传递给切片,然后切片知道往哪运算。即往块的位置信息计算
                            blkLocations[blkIndex].getCachedHosts()));
                bytesRemaining -= splitSize;
              }
    
              if (bytesRemaining != 0) {
                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                           blkLocations[blkIndex].getHosts(),
                           blkLocations[blkIndex].getCachedHosts()));
              }
            } else { // not splitable
              splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                          blkLocations[0].getCachedHosts()));
            }
          } else { 
            //Create empty hosts array for zero length files
            splits.add(makeSplit(path, 0, length, new String[0]));
          }
        }
        // Save the number of input files for metrics/loadgen
        job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
        sw.stop();
        if (LOG.isDebugEnabled()) {
          LOG.debug("Total # of splits generated by getSplits: " + splits.size()
              + ", TimeTaken: " + sw.elapsedMillis());
        }
        return splits;
      }
     1.long splitSize = computeSplitSize(blockSize, minSize, maxSize);追踪源码发现
    protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
        return Math.max(minSize, Math.min(maxSize, blockSize));
      }
    

     切片大小默认是块的大小!!!!

    假如让切片大小 < 块的大小则更改配置的最大值MaxSize,让其小于blocksize

    假如让切片大小 > 块的大小则更改配置的最小值MinSize,让其大于blocksize

    通过FileInputFormat.setMinInputSplitSize即可。

     2. int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining) 追踪源码发现

     protected int getBlockIndex(BlockLocation[] blkLocations, 
                                  long offset) {
        for (int i = 0 ; i < blkLocations.length; i++) {
          // is the offset inside this block?
          if ((blkLocations[i].getOffset() <= offset) &&
              (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){//切片要大于>=块的起始量,小于一个块的末尾量。
            return i;//返回这个块
          }
        }
        BlockLocation last = blkLocations[blkLocations.length -1];
        long fileLength = last.getOffset() + last.getLength() -1;
        throw new IllegalArgumentException("Offset " + offset + 
                                           " is outside of file (0.." +
                                           fileLength + ")");
      }
    

     3. splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts()

    创建切片的时候,一个切片对应一个mapperr任务,所以创建切片的四个位置(path,0,10,host)

    根据host可知mapper任务的计算位置,则对应计算向数据移动!!!!块是逻辑的,并没有真正切割数据。!!

    4.上述getSplits方法最终得到一个切片的清单,清单的数目就是mapper的数量!!即开始方法的入口 int maps = writeSplits(job, submitJobDir);返回值。

    5.计算向数据移动时会拉取只属于自己的文件。

    持续更新中。。。。,欢迎大家关注我的公众号LHWorld.

  • 相关阅读:
    NSIS打包electron程序为exe安装包
    3、electron打包生成exe文件
    7、Shiro加密和加盐
    6、Shiro之自定义realm
    5、Shiro之jdbcRealm认证授权
    Jmeter-提取Json数据进行关联
    CentOS7 adb
    centos 7 安装pip和pip3
    最新Centos7安装python3并与python2共存
    centos7安装chrome浏览器
  • 原文地址:https://www.cnblogs.com/LHWorldBlog/p/8244881.html
Copyright © 2011-2022 走看看