zoukankan      html  css  js  c++  java
  • Sqoop-1.4.6 Merge源码分析与改造使其支持多个merge-key

      Sqoop中提供了一个用于合并数据集的工具sqoop-merge。官方文档中的描述可以参考我的另一篇博客Sqoop-1.4.5用户手册
      Merge的基本原理是,需要指定新数据集和老数据集的路径,根据某个merge-key,在reduce过程中,优先取出新数据集中的数据,共同合并成新的全量数据。具体的逻辑分析可以稍后通过看Sqoop-1.4.6的源码来进一步了解。
      但是,在原生的Sqoop中,目前只支持merge-key为一个字段的情况,本文通过分析源代码并对源代码进行更改,可以在使用Sqoop的Merge功能时支持任意多个merge-key。

    一、Sqoop Merge Tool使用示例

      在这里模拟一次数据增量同步到hive中的过程。

    1、数据准备

      有一张分区表sqoop_merge_all,分区字段pt,在hdfs上的文件存储路径为:hdfs://m000/user/hive/warehouse/sqoop_merge_all,表结构如下:

    字段 类型
    id int
    type string
    comments string
    updatetime string

      另外有一张增量表sqoop_merge_inc,该表的hdfs路径为hdfs://m000/user/hive/warehouse/sqoop_merge_inc
      看一下这两张表中的数据:
    sqoop_merge_all (pt=20160810)
      这里写图片描述
    假设这里存的是20160810的全量数据。
    sqoop_merge_inc
      这里写图片描述
    假设20160811当天,对type=type1的记录进行了更新,并且新增了一条type=type3的记录。
      现在,需要把sqoop_merge_inc中的两条记录与sqoop_merge_all分区 (pt=20160810)中的两条记录进行合并,合并后的数据存入sqoop_merge_all的分区 (pt=20160811)
      按照正常逻辑,id=1的那条记录被更新成id=3的那一条,id=4的记录新增,那么最终sqoop_merge_all的分区 (pt=20160811)中应该包含的记录分别为id=2,id=3,id=4这三条。

    2、Sqoop Merge操作

      完整的sqoop merge命令如下:

    sqoop merge 
      --new-data hdfs://m000/user/hive/warehouse/sqoop_merge_inc 
      --onto hdfs://m000/user/hive/warehouse/sqoop_merge_all/pt=20160810 
      --target-dir hdfs://m000/user/hive/warehouse/sqoop_merge_all/pt=20160811 
      --jar-file /usr/local/sqoop/bindir/sqoop_merge.jar 
      --class-name sqoop_merge 
      --merge-key type

      简单说明一下,上面这句命令中表示,将新的数据集(参数--new-data)与已有的数据集(参数--onto)进行合并,合并后的数据存入(参数--target-dir)路径下。需要指定这次合并中使用的表的结构jar包和class。根据type字段进行合并。
      最终结果如下,与上面期望的相一致。
      这里写图片描述

      我们看一眼sqoop_merge.jar以及sqoop_merge.class的内容,基本上可以理解成这个类是sqoop_merge表的一个java bean定义。
      这里写图片描述

    二、Sqoop Merge Tool实现原理

      那么,Sqoop是如何实现上面这个功能的呢?

    1、脚本分析

      在$SQOOP_HOME/bin路径下,有一个sqoop-merge脚本。
    (1)sqoop-merge
      该脚本主要逻辑就是调用了sqoop merge命令,并把类似于--onto之类的参数都传入。

    prgm=`readlink -f $0`
    bin=`dirname ${prgm}`
    bin=`cd ${bin} && pwd`
    
    exec ${bin}/sqoop merge "$@" 

    (2)sqoop
      该脚本中,对参数进行处理后,最终执行下面这一句。把merge也一并传入。

    source ${bin}/configure-sqoop "${bin}"
    exec ${HADOOP_COMMON_HOME}/bin/hadoop org.apache.sqoop.Sqoop "$@"

    2、Java源代码

      查看Sqoop源代码,可以看到Sqoop是使用Java语言实现的。我们首先找到上面脚本中使用的类。由于每个类中的方法调用过程比较麻烦,接下来只分析主要代码,首先,完整的调用链如下所示。

    org.apache.sqoop.Sqoop#main
    --> org.apache.sqoop.Sqoop#runTool(args) 
    --> org.apache.sqoop.Sqoop#runTool(args, new Configuration())
    --> org.apache.sqoop.Sqoop#runSqoop(Sqoop sqoop, args[1...end])
    --> org.apache.hadoop.util.ToolRunner.run()
    --> tool.run()

      最后这个tool的类型会详细分析。
      
    (1)org.apache.sqoop.Sqoop
      这个类是整个调用链的入口,这个类中主要方法的逻辑如下。
      在第二个runTool方法调用处,会根据传入的第一个参数(即我们传入的”merge”),生成一个SqoopTool类型的tool对象。

      public static int runTool(String [] args, Configuration conf) {
          ...
          String toolName = expandedArgs[0];
          Configuration pluginConf = SqoopTool.loadPlugins(conf);
          SqoopTool tool = SqoopTool.getTool(toolName);
          ...
          Sqoop sqoop = new Sqoop(tool, pluginConf);
          // 除去"merge"参数之外的其他参数,一起传入runSqoop中
          return runSqoop(sqoop, Arrays.copyOfRange(expandedArgs, 1, expandedArgs.length));
      }

      那么这个tool对象到底是个什么呢?跟踪进入com.cloudera.sqoop.tool#getTool方法,进入org.apache.sqoop.tool.SqoopTool#getTool方法。

    (2)org.apache.sqoop.tool.SqoopTool
      这里面,是从一个名为TOOLS的Map中,根据toolName获取对应的类对象。从下面代码中我们可以看到Sqoop支持的所有Tool,并且merge对应的Tool是MergeTool类型的。

    private static final Map<String, Class<? extends SqoopTool>> TOOLS;
    TOOLS = new TreeMap<String, Class<? extends SqoopTool>>();
    ...
    // registerTool方法最终都会向TOOLS中put一个对象
    registerTool("codegen", CodeGenTool.class,
        "Generate code to interact with database records");
    registerTool("create-hive-table", CreateHiveTableTool.class,
        "Import a table definition into Hive");
    registerTool("eval", EvalSqlTool.class,
        "Evaluate a SQL statement and display the results");
    registerTool("export", ExportTool.class,
        "Export an HDFS directory to a database table");
    registerTool("import", ImportTool.class,
        "Import a table from a database to HDFS");
    registerTool("import-all-tables", ImportAllTablesTool.class,
        "Import tables from a database to HDFS");
    registerTool("import-mainframe", MainframeImportTool.class,
            "Import datasets from a mainframe server to HDFS");
    registerTool("help", HelpTool.class, "List available commands");
    registerTool("list-databases", ListDatabasesTool.class,
        "List available databases on a server");
    registerTool("list-tables", ListTablesTool.class,
        "List available tables in a database");
    registerTool("merge", MergeTool.class,
        "Merge results of incremental imports");
    registerTool("metastore", MetastoreTool.class,
        "Run a standalone Sqoop metastore");
    registerTool("job", JobTool.class,
        "Work with saved jobs");
    registerTool("version", VersionTool.class,
        "Display version information");

      最前面那个调用链最后那个tool对象,对应的就是MergeTool类型。那么接下来我们进入到MergeTool#run中。

    (3)org.apache.sqoop.tool.MergeTool
      在这个方法中,生成一个MergeJob对象,然后通过该mergeJob的runMergeJob方法,运行一个MapReduce任务。

      public int run(SqoopOptions options) {
        try {
          // Configure and execute a MapReduce job to merge these datasets.
          MergeJob mergeJob = new MergeJob(options);
          if (!mergeJob.runMergeJob()) {
            LOG.error("MapReduce job failed!");
            return 1;
          }
        } catch (IOException ioe) {
          ...
        }
        return 0;
      }

    (4)org.apache.sqoop.mapreduce.MergeJob
      这个类的runMergeJob方法是一个标准的MapReduce程序。我们主要跟踪其Mapper类和Reducer类。

      public boolean runMergeJob() throws IOException {
          ...
          if (ExportJobBase.isSequenceFiles(jobConf, newPath)) {
            job.setInputFormatClass(SequenceFileInputFormat.class);
            job.setOutputFormatClass(SequenceFileOutputFormat.class);
            job.setMapperClass(MergeRecordMapper.class);
          } else {
            job.setMapperClass(MergeTextMapper.class);
            job.setOutputFormatClass(RawKeyTextOutputFormat.class);
          }
          ...
          job.setReducerClass(MergeReducer.class);
      }

      使用的Reducer类是MergeReducer,根据文件类型分别生成MergeRecordMapperMergeTextMapper类型的Mapper类。但是,不管表文件类型是什么,这两个Mapper类,最终共同继承了MergeMapperBase类,并且在各自的map方法中,调用了MergeMapperBase#processRecord方法,map阶段的主要逻辑也就在该方法中。

    (5)org.apache.sqoop.mapreduce.MergeMapperBase
      这里我们只分析processRecord方法。
      在这个方法中我们看到,每一条记录对应一个MergeRecord对象,这个对象最后会在map的输出中输出到reduce阶段。fieldMap是一个Map类型,其key就是我们通过参数--merge-key指定的字段,根据该字段名称,从fieldMap中取出当前记录该字段的值,转化成String后,当作map的输出,与MergeRecord对象一起交给Reduce来处理。

      protected void processRecord(SqoopRecord r, Context c)
          throws IOException, InterruptedException {
        MergeRecord mr = new MergeRecord(r, isNew);
        Map<String, Object> fieldMap = r.getFieldMap();
        if (null == fieldMap) {
          throw new IOException("No field map in record " + r);
        }
        Object keyObj = fieldMap.get(keyColName);
        if (null == keyObj) {
          throw new IOException("Cannot join values on null key. "
              + "Did you specify a key column that exists?");
        } else {
          c.write(new Text(keyObj.toString()), mr);
        }
      }

    (6)org.apache.sqoop.mapreduce.MergeReducer
      这个类中,reduce方法的逻辑如下。  
      取出MergeRecord集合中相同key的所有记录,如果新数据集中不包含当前字段值的记录,则从旧的数据集中取该条记录。如果新旧数据集中都有该记录,则从新的数据集中取出该记录。

      public void reduce(Text key, Iterable<MergeRecord> vals, Context c)
          throws IOException, InterruptedException {
        SqoopRecord bestRecord = null;
        try {
          for (MergeRecord val : vals) {
            if (null == bestRecord && !val.isNewRecord()) {
              // Use an old record if we don't have a new record.
              bestRecord = (SqoopRecord) val.getSqoopRecord().clone();
            } else if (val.isNewRecord()) {
              bestRecord = (SqoopRecord) val.getSqoopRecord().clone();
            }
          }
        } catch (CloneNotSupportedException cnse) {
          throw new IOException(cnse);
        }
    
        if (null != bestRecord) {
          c.write(bestRecord, NullWritable.get());
        }
      }

    三、Sqoop Merge Tool源码修改

      从上面源代码过程分析可以看到,merge过程只能指定一个字段,如果指定多个字段时,会报如下的错,提示当前指定的字段不存在。

    16/08/22 15:54:15 INFO mapreduce.Job: Task Id : attempt_1470135750174_2508_m_000004_2, Status : FAILED
    Error: java.io.IOException: Cannot join values on null key. Did you specify a key column that exists?
        at org.apache.sqoop.mapreduce.MergeMapperBase.processRecord(MergeMapperBase.java:79)
        at org.apache.sqoop.mapreduce.MergeTextMapper.map(MergeTextMapper.java:58)
        at org.apache.sqoop.mapreduce.MergeTextMapper.map(MergeTextMapper.java:34)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)

      并且我们发现使用merge key对记录进行合并主要发生在map阶段,所以如果需要支持多个字段的merge时我们只需要修改MergeMapperBase#processRecord方法即可。修改后的代码如下
      在使用sqoop merge时,多个字段用逗号分隔,把每个字段对应的值取出来拼接成新的key。

        protected void processRecord(SqoopRecord r, Context c)
                throws IOException, InterruptedException {
            MergeRecord mr = new MergeRecord(r, isNew);
            Map<String, Object> fieldMap = r.getFieldMap();
            if (null == fieldMap) {
                throw new IOException("No field map in record " + r);
            }
            Object keyObj = null;
            if (keyColName.contains(",")) {
                String connectStr = new String(new byte[]{1});
                StringBuilder keyFieldsSb = new StringBuilder();
                for (String str : keyColName.split(",")) {
                    keyFieldsSb.append(connectStr).append(fieldMap.get(str).toString());
                }
                keyObj = keyFieldsSb;
            } else {
                keyObj = fieldMap.get(keyColName);
            }
    
            if (null == keyObj) {
                throw new IOException("Cannot join values on null key. "
                        + "Did you specify a key column that exists?");
            } else {
                c.write(new Text(keyObj.toString()), mr);
            }
        }

      上面需要注意的一点是,我的拼接符使用了一个byte的String,这样可以避免以下这种情况。
      假设使用“+”当拼接符,如果存在两条记录:

    Field a Field b
    a+ b
    a +b

      使用字段a,b进行merge时,上面两条不一样的记录最终会被程序认为是相同的,由此会产生新的数据不准确问题。
      
      有关该问题的更多信息可以参考[SQOOP-3002]

    四、多字段的merge

      还是以上面两张表为例进行测试,表sqoop_merge_all使用两个新的分区pt=20160801,pt=20160802
      sqoop_merge_all (pt=20160801)数据
      这里写图片描述
      sqoop_merge_inc数据
      这里写图片描述
      指定--merge-key type,comments进行merge,理论上只有id=1的那一条记录被更新成id=5的那一条。合并后的数据应该包含id=2,id=3,id=4,id=5这四条记录。

    sqoop merge 
      --new-data hdfs://m000/user/hive/warehouse/sqoop_merge_inc 
      --onto hdfs://m000/user/hive/warehouse/sqoop_merge_all/pt=20160801 
      --target-dir hdfs://m000/user/hive/warehouse/sqoop_merge_all/pt=20160802 
      --jar-file /usr/local/sqoop/bindir/sqoop_merge.jar 
      --class-name sqoop_merge 
      --merge-key type,comments

      merge后,最终sqoop_merge_all (pt=20160802)的数据为:
      这里写图片描述

  • 相关阅读:
    Webbrowser 取消下载提示框
    The service ‘xxx’ configured for WCF is not registered with the Autofac container
    Code First 中的 TPH TPT TPC
    SQL Server 之 解锁
    导入 github 步骤
    初试 pyhton 简易采集
    js 一些小技巧
    linux 学习笔记
    lnmp 环境搭建后,pathinfo 模式支持的配制。
    windows 快捷键相关命令
  • 原文地址:https://www.cnblogs.com/wuyida/p/6300229.html
Copyright © 2011-2022 走看看