zoukankan      html  css  js  c++  java
  • MapReduce源码分析之JobSplitWriter

    JobSplitWriter被作业客户端用于写分片相关文件,包括分片数据文件job.split和分片元数据信息文件job.splitmetainfo。它有两个静态成员变量,如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 分片版本,当前默认为1  
    2. private static final int splitVersion = JobSplit.META_SPLIT_VERSION;  
    3. // 分片文件头部,为UTF-8格式的字符串"SPL"的字节数组"SPL"  
    4. private static final byte[] SPLIT_FILE_HEADER;  

            并且,提供了一个静态方法,完成SPLIT_FILE_HEADER的初始化,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 静态方法,加载SPLIT_FILE_HEADER为UTF-8格式的字符串"SPL"的字节数组byte[]  
    2. static {  
    3.   try {  
    4.     SPLIT_FILE_HEADER = "SPL".getBytes("UTF-8");  
    5.   } catch (UnsupportedEncodingException u) {  
    6.     throw new RuntimeException(u);  
    7.   }  
    8. }  

            JobSplitWriter实现其功能的为createSplitFiles()方法,它有三种实现,我们先看其中的public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir,Configuration conf, FileSystem fs, T[] splits),代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1.  // 创建分片文件  
    2.  public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir,   
    3.      Configuration conf, FileSystem fs, T[] splits)   
    4.  throws IOException, InterruptedException {  
    5.      
    6. // 调用createFile()方法,创建分片文件,并获取文件系统数据输出流FSDataOutputStream实例out,  
    7. // 对应路径为jobSubmitDir/job.split,jobSubmitDir为参数yarn.app.mapreduce.am.staging-dir指定的路径/作业所属用户user/.staging/作业ID  
    8. FSDataOutputStream out = createFile(fs,   
    9.        JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);  
    10.   
    11. // 调用writeNewSplits()方法,将分片数据写入分片文件,并得到分片元数据信息SplitMetaInfo数组info  
    12.    SplitMetaInfo[] info = writeNewSplits(conf, splits, out);  
    13.      
    14.    // 关闭输出流  
    15.    out.close();  
    16.      
    17.    // 调用writeJobSplitMetaInfo()方法,将分片元数据信息写入分片元数据文件  
    18.    writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),   
    19.        new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,  
    20.        info);  
    21.  }  

            createSplitFiles()方法的逻辑很清晰,大体如下:

            1、调用createFile()方法,创建分片文件,并获取文件系统数据输出流FSDataOutputStream实例out,对应路径为jobSubmitDir/job.split,jobSubmitDir为参数yarn.app.mapreduce.am.staging-dir指定的路径/作业所属用户user/.staging/作业ID;

            2、调用writeNewSplits()方法,将分片数据写入分片文件,并得到分片元数据信息SplitMetaInfo数组info;

            3、关闭输出流out;

            4、调用writeJobSplitMetaInfo()方法,将分片元数据信息写入分片元数据文件。

            我们先来看下createFile()方法,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1.  private static FSDataOutputStream createFile(FileSystem fs, Path splitFile,   
    2.      Configuration job)  throws IOException {  
    3.     
    4. // 调用HDFS文件系统FileSystem的create()方法,获取文件系统数据输出流FSDataOutputStream实例out,  
    5. // 对应权限为JobSubmissionFiles.JOB_FILE_PERMISSION,即0644,rw-r--r--  
    6.    FSDataOutputStream out = FileSystem.create(fs, splitFile,   
    7.        new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));  
    8.      
    9.    // 获取副本数replication,取参数mapreduce.client.submit.file.replication,参数未配置默认为10  
    10.    int replication = job.getInt(Job.SUBMIT_REPLICATION, 10);  
    11.      
    12.    // 通过文件系统FileSystem实例fs的setReplication()方法,设置splitFile的副本数位10  
    13.    fs.setReplication(splitFile, (short)replication);  
    14.      
    15.    // 调用writeSplitHeader()方法写入分片头信息  
    16.    writeSplitHeader(out);  
    17.      
    18.    // 返回文件系统数据输出流out  
    19.    return out;  
    20.  }  

            首先,调用HDFS文件系统FileSystem的create()方法,获取文件系统数据输出流FSDataOutputStream实例out,对应权限为JobSubmissionFiles.JOB_FILE_PERMISSION,即0644,rw-r--r--;

            其次,获取副本数replication,取参数mapreduce.client.submit.file.replication,参数未配置默认为10;

            接着,通过文件系统FileSystem实例fs的setReplication()方法,设置splitFile的副本数位10;

            然后,调用writeSplitHeader()方法写入分片头信息;

            最后,返回文件系统数据输出流out。

            writeSplitHeader()方法专门用于将分片头部信息写入分片文件,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1.  private static void writeSplitHeader(FSDataOutputStream out)   
    2.  throws IOException {  
    3.     
    4. // 文件系统数据输出流out写入byte[],内容为UTF-8格式的"SPL"  
    5.    out.write(SPLIT_FILE_HEADER);  
    6.    // 文件系统数据输出流out写入int,分片版本号,目前为1  
    7.    out.writeInt(splitVersion);  
    8.  }  

            很简单,首先文件系统数据输出流out写入byte[],内容为UTF-8格式的"SPL",然后文件系统数据输出流out写入int,分片版本号,目前为1。

            接下来,我们再看下writeNewSplits()方法,它将分片数据写入分片文件,并得到分片元数据信息SplitMetaInfo数组info,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1.  @SuppressWarnings("unchecked")  
    2.  private static <T extends InputSplit>   
    3.  SplitMetaInfo[] writeNewSplits(Configuration conf,   
    4.      T[] array, FSDataOutputStream out)  
    5.  throws IOException, InterruptedException {  
    6.   
    7. // 根据array的大小,构造同等大小的分片元数据信息SplitMetaInfo数组info,  
    8. // array其实是传入的分片数组  
    9.    SplitMetaInfo[] info = new SplitMetaInfo[array.length];  
    10.    if (array.length != 0) {// 如果array中有数据  
    11.       
    12.      // 创建序列化工厂SerializationFactory实例factory  
    13.      SerializationFactory factory = new SerializationFactory(conf);  
    14.      int i = 0;  
    15.        
    16.      // 获取最大的数据块位置maxBlockLocations,取参数mapreduce.job.max.split.locations,参数未配置默认为10  
    17.      int maxBlockLocations = conf.getInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY,  
    18.          MRConfig.MAX_BLOCK_LOCATIONS_DEFAULT);  
    19.        
    20.      // 通过输出流out的getPos()方法获取输出流out的当前位置offset  
    21.      long offset = out.getPos();  
    22.        
    23.      // 遍历数组array中每个元素split  
    24.        
    25.      for(T split: array) {  
    26.         
    27.     // 通过输出流out的getPos()方法获取输出流out的当前位置prevCount  
    28.        long prevCount = out.getPos();  
    29.          
    30.        // 往输出流out中写入String,内容为split对应的类名  
    31.        Text.writeString(out, split.getClass().getName());  
    32.          
    33.        // 获取序列化器Serializer实例serializer  
    34.        Serializer<T> serializer =   
    35.          factory.getSerializer((Class<T>) split.getClass());  
    36.          
    37.        // 打开serializer,接入输出流out  
    38.        serializer.open(out);  
    39.          
    40.        // 将split序列化到输出流out  
    41.        serializer.serialize(split);  
    42.          
    43.        // 通过输出流out的getPos()方法获取输出流out的当前位置currCount  
    44.        long currCount = out.getPos();  
    45.          
    46.        // 通过split的getLocations()方法,获取位置信息locations  
    47.        String[] locations = split.getLocations();  
    48.        if (locations.length > maxBlockLocations) {  
    49.          LOG.warn("Max block location exceeded for split: "  
    50.              + split + " splitsize: " + locations.length +  
    51.              " maxsize: " + maxBlockLocations);  
    52.          locations = Arrays.copyOf(locations, maxBlockLocations);  
    53.        }  
    54.          
    55.        // 构造split对应的元数据信息,并加入info指定位置,  
    56.        // offset为当前split在split文件中的起始位置,数据长度为split.getLength(),位置信息为locations  
    57.        info[i++] =   
    58.          new JobSplit.SplitMetaInfo(   
    59.              locations, offset,  
    60.              split.getLength());  
    61.          
    62.        // offset增加当前split已写入数据大小  
    63.        offset += currCount - prevCount;  
    64.      }  
    65.    }  
    66.      
    67.    // 返回分片元数据信息SplitMetaInfo数组info  
    68.    return info;  
    69.  }  

            writeNewSplits()方法的逻辑比较清晰,大体如下:

            1、根据array的大小,构造同等大小的分片元数据信息SplitMetaInfo数组info,array其实是传入的分片数组;

            2、如果array中有数据:

                  2.1、创建序列化工厂SerializationFactory实例factory;

                  2.2、获取最大的数据块位置maxBlockLocations,取参数mapreduce.job.max.split.locations,参数未配置默认为10;

                  2.3、通过输出流out的getPos()方法获取输出流out的当前位置offset;

                  2.4、遍历数组array中每个元素split:

                           2.4.1、通过输出流out的getPos()方法获取输出流out的当前位置prevCount;

                           2.4.2、往输出流out中写入String,内容为split对应的类名;

                           2.4.3、获取序列化器Serializer实例serializer;

                           2.4.4、打开serializer,接入输出流out;

                           2.4.5、将split序列化到输出流out;

                           2.4.6、通过输出流out的getPos()方法获取输出流out的当前位置currCount;

                           2.4.7、通过split的getLocations()方法,获取位置信息locations;

                           2.4.8、确保位置信息locations的长度不能超过maxBlockLocations,超过则截断;

                           2.4.9、构造split对应的元数据信息,并加入info指定位置,offset为当前split在split文件中的起始位置,数据长度为split.getLength(),位置信息为locations;

                           2.4.10、offset增加当前split已写入数据大小;

            3、返回分片元数据信息SplitMetaInfo数组info。

            其中,序列化split对象时,我们以FileSplit为例来分析,其write()方法如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1.  @Override  
    2.  public void write(DataOutput out) throws IOException {  
    3. // 写入文件路径全名  
    4.    Text.writeString(out, file.toString());  
    5.    // 写入分片在文件中的起始位置  
    6.    out.writeLong(start);  
    7.    // 写入分片在文件中的长度  
    8.    out.writeLong(length);  
    9.  }  

            比较简单,分别写入文件路径全名、分片在文件中的起始位置、分片在文件中的长度三个信息。

            综上所述,分片文件job.split文件的内容为:

            1、文件头:"SPL"+int类型版本号1;

            2、分片类信息:String类型split对应类名;

            3、分片数据信息:String类型文件路径全名+Long类型分片在文件中的起始位置+Long类型分片在文件中的长度。

            而在最后,构造分片元数据信息时,产生的是JobSplit的静态内部类SplitMetaInfo对象,包括分片位置信息locations、split在split文件中的起始位置offset、分片长度split.getLength()。

            下面,我们再看下分片的元数据信息文件是如何产生的,让我们来研究下writeJobSplitMetaInfo()方法,代码如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1.  // 写入作业分片元数据信息  
    2.  private static void writeJobSplitMetaInfo(FileSystem fs, Path filename,   
    3.      FsPermission p, int splitMetaInfoVersion,   
    4.      JobSplit.SplitMetaInfo[] allSplitMetaInfo)   
    5.  throws IOException {  
    6.    // write the splits meta-info to a file for the job tracker  
    7. // 调用HDFS文件系统FileSystem的create()方法,生成分片元数据信息文件,并获取文件系统数据输出流FSDataOutputStream实例out,  
    8. // 对应文件路径为jobSubmitDir/job.splitmetainfo,jobSubmitDir为参数yarn.app.mapreduce.am.staging-dir指定的路径/作业所属用户user/.staging/作业ID  
    9. // 对应权限为JobSubmissionFiles.JOB_FILE_PERMISSION,即0644,rw-r--r--  
    10.    FSDataOutputStream out =   
    11.      FileSystem.create(fs, filename, p);  
    12.      
    13.    // 写入分片元数据头部信息UTF-8格式的字符串"META-SPL"的字节数组byte[]  
    14.    out.write(JobSplit.META_SPLIT_FILE_HEADER);  
    15.      
    16.    // 写入分片元数据版本号splitMetaInfoVersion,当前为1  
    17.    WritableUtils.writeVInt(out, splitMetaInfoVersion);  
    18.    // 写入分片元数据个数,为分片元数据信息SplitMetaInfo数组个数allSplitMetaInfo.length  
    19.    WritableUtils.writeVInt(out, allSplitMetaInfo.length);  
    20.      
    21.    // 遍历分片元数据信息SplitMetaInfo数组allSplitMetaInfo中每个splitMetaInfo,挨个写入输出流  
    22.    for (JobSplit.SplitMetaInfo splitMetaInfo : allSplitMetaInfo) {  
    23.      splitMetaInfo.write(out);  
    24.    }  
    25.      
    26.    // 关闭输出流out  
    27.    out.close();  
    28.  }  

            writeJobSplitMetaInfo()方法的主体逻辑也十分清晰,大体如下:

            1、调用HDFS文件系统FileSystem的create()方法,生成分片元数据信息文件,并获取文件系统数据输出流FSDataOutputStream实例out,对应文件路径为jobSubmitDir/job.splitmetainfo,jobSubmitDir为参数yarn.app.mapreduce.am.staging-dir指定的路径/作业所属用户user/.staging/作业ID,对应权限为JobSubmissionFiles.JOB_FILE_PERMISSION,即0644,rw-r--r--;

            2、写入分片元数据头部信息UTF-8格式的字符串"META-SPL"的字节数组byte[];

            3、写入分片元数据版本号splitMetaInfoVersion,当前为1;

            4、写入分片元数据个数,为分片元数据信息SplitMetaInfo数组个数allSplitMetaInfo.length;

            5、遍历分片元数据信息SplitMetaInfo数组allSplitMetaInfo中每个splitMetaInfo,挨个写入输出流;

            6、关闭输出流out。
            我们看下如何序列化JobSplit.SplitMetaInfo,将其写入文件,JobSplit.SplitMetaInfo的write()如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. public void write(DataOutput out) throws IOException {  
    2.     
    3.   // 将分片位置个数写入分片元数据信息文件  
    4.   WritableUtils.writeVInt(out, locations.length);  
    5.   // 遍历位置信息,写入分片元数据信息文件  
    6.   for (int i = 0; i < locations.length; i++) {  
    7.     Text.writeString(out, locations[i]);  
    8.   }  
    9.   // 写入分片元数据信息的起始位置  
    10.   WritableUtils.writeVLong(out, startOffset);  
    11.   // 写入分片大小  
    12.   WritableUtils.writeVLong(out, inputDataLength);  
    13. }  

            每个分片的元数据信息,包括分片位置个数、分片文件位置、分片元数据信息的起始位置、分片大小等内容。

            总结

            JobSplitWriter被作业客户端用于写分片相关文件,包括分片数据文件job.split和分片元数据信息文件job.splitmetainfo。分片数据文件job.split存储的主要是每个分片对应的HDFS文件路径,和其在HDFS文件中的起始位置、长度等信息,而分片元数据信息文件job.splitmetainfo存储的则是每个分片在分片数据文件job.split中的起始位置、分片大小等信息。

            job.split文件内容:文件头 + 分片 + 分片 + ... + 分片

            文件头:"SPL" + 版本号1

            分片:分片类 + 分片数据,分片类=String类型split对应类名,分片数据=String类型HDFS文件路径全名+Long类型分片在HDFS文件中的起始位置+Long类型分片在HDFS文件中的长度

            job.splitmetainfo文件内容:文件头 + 分片元数据个数 + 分片元数据 + 分片元数据 + ... + 分片元数据

            文件头:"META-SPL" + 版本号1

            分片元数据个数:分片元数据的个数

            分片元数据:分片位置个数+分片位置+在分片文件job.split中的起始位置+分片大小

  • 相关阅读:
    Oracle什么时候需要Commit
    Oracle数据库SQL优化
    事务的四个特性(ACID)
    MySQL的两种存储引擎storage engine特点和对比
    MySql绿色版下载,安装,配置详解
    Unsupported major.minor version 51.0
    linux服务器下添加字体
    java日期比较,日期计算
    微信公众号开发保持用户登录状态思路
    本机搭建外网web服务器
  • 原文地址:https://www.cnblogs.com/jirimutu01/p/5556356.html
Copyright © 2011-2022 走看看