Sqoop中提供了一个用于合并数据集的工具sqoop-merge
。官方文档中的描述可以参考我的另一篇博客Sqoop-1.4.5用户手册。
Merge的基本原理是,需要指定新数据集和老数据集的路径,根据某个merge-key,在reduce过程中,优先取出新数据集中的数据,共同合并成新的全量数据。具体的逻辑分析可以稍后通过看Sqoop-1.4.6的源码来进一步了解。
但是,在原生的Sqoop中,目前只支持merge-key为一个字段的情况,本文通过分析源代码并对源代码进行更改,可以在使用Sqoop的Merge功能时支持任意多个merge-key。
一、Sqoop Merge Tool使用示例
在这里模拟一次数据增量同步到hive中的过程。
1、数据准备
有一张分区表sqoop_merge_all
,分区字段pt,在hdfs上的文件存储路径为:hdfs://m000/user/hive/warehouse/sqoop_merge_all
,表结构如下:
字段 | 类型 |
---|---|
id | int |
type | string |
comments | string |
updatetime | string |
另外有一张增量表sqoop_merge_inc
,该表的hdfs路径为hdfs://m000/user/hive/warehouse/sqoop_merge_inc
。
看一下这两张表中的数据:
sqoop_merge_all (pt=20160810)
假设这里存的是20160810的全量数据。
sqoop_merge_inc
假设20160811当天,对type=type1
的记录进行了更新,并且新增了一条type=type3
的记录。
现在,需要把sqoop_merge_inc
中的两条记录与sqoop_merge_all
分区 (pt=20160810)
中的两条记录进行合并,合并后的数据存入sqoop_merge_all
的分区 (pt=20160811)
。
按照正常逻辑,id=1
的那条记录被更新成id=3
的那一条,id=4
的记录新增,那么最终sqoop_merge_all
的分区 (pt=20160811)
中应该包含的记录分别为id=2,id=3,id=4
这三条。
2、Sqoop Merge操作
完整的sqoop merge命令如下:
sqoop merge
--new-data hdfs://m000/user/hive/warehouse/sqoop_merge_inc
--onto hdfs://m000/user/hive/warehouse/sqoop_merge_all/pt=20160810
--target-dir hdfs://m000/user/hive/warehouse/sqoop_merge_all/pt=20160811
--jar-file /usr/local/sqoop/bindir/sqoop_merge.jar
--class-name sqoop_merge
--merge-key type
简单说明一下,上面这句命令中表示,将新的数据集(参数--new-data
)与已有的数据集(参数--onto
)进行合并,合并后的数据存入(参数--target-dir
)路径下。需要指定这次合并中使用的表的结构jar包和class。根据type
字段进行合并。
最终结果如下,与上面期望的相一致。
我们看一眼sqoop_merge.jar
以及sqoop_merge.class
的内容,基本上可以理解成这个类是sqoop_merge表的一个java bean定义。
二、Sqoop Merge Tool实现原理
那么,Sqoop是如何实现上面这个功能的呢?
1、脚本分析
在$SQOOP_HOME/bin
路径下,有一个sqoop-merge脚本。
(1)sqoop-merge
该脚本主要逻辑就是调用了sqoop merge
命令,并把类似于--onto
之类的参数都传入。
prgm=`readlink -f $0`
bin=`dirname ${prgm}`
bin=`cd ${bin} && pwd`
exec ${bin}/sqoop merge "$@"
(2)sqoop
该脚本中,对参数进行处理后,最终执行下面这一句。把merge也一并传入。
source ${bin}/configure-sqoop "${bin}"
exec ${HADOOP_COMMON_HOME}/bin/hadoop org.apache.sqoop.Sqoop "$@"
2、Java源代码
查看Sqoop源代码,可以看到Sqoop是使用Java语言实现的。我们首先找到上面脚本中使用的类。由于每个类中的方法调用过程比较麻烦,接下来只分析主要代码,首先,完整的调用链如下所示。
org.apache.sqoop.Sqoop#main
--> org.apache.sqoop.Sqoop#runTool(args)
--> org.apache.sqoop.Sqoop#runTool(args, new Configuration())
--> org.apache.sqoop.Sqoop#runSqoop(Sqoop sqoop, args[1...end])
--> org.apache.hadoop.util.ToolRunner.run()
--> tool.run()
最后这个tool的类型会详细分析。
(1)org.apache.sqoop.Sqoop
这个类是整个调用链的入口,这个类中主要方法的逻辑如下。
在第二个runTool方法调用处,会根据传入的第一个参数(即我们传入的”merge”),生成一个SqoopTool
类型的tool对象。
public static int runTool(String [] args, Configuration conf) {
...
String toolName = expandedArgs[0];
Configuration pluginConf = SqoopTool.loadPlugins(conf);
SqoopTool tool = SqoopTool.getTool(toolName);
...
Sqoop sqoop = new Sqoop(tool, pluginConf);
// 除去"merge"参数之外的其他参数,一起传入runSqoop中
return runSqoop(sqoop, Arrays.copyOfRange(expandedArgs, 1, expandedArgs.length));
}
那么这个tool对象到底是个什么呢?跟踪进入com.cloudera.sqoop.tool#getTool
方法,进入org.apache.sqoop.tool.SqoopTool#getTool
方法。
(2)org.apache.sqoop.tool.SqoopTool
这里面,是从一个名为TOOLS的Map中,根据toolName获取对应的类对象。从下面代码中我们可以看到Sqoop支持的所有Tool,并且merge对应的Tool是MergeTool
类型的。
private static final Map<String, Class<? extends SqoopTool>> TOOLS;
TOOLS = new TreeMap<String, Class<? extends SqoopTool>>();
...
// registerTool方法最终都会向TOOLS中put一个对象
registerTool("codegen", CodeGenTool.class,
"Generate code to interact with database records");
registerTool("create-hive-table", CreateHiveTableTool.class,
"Import a table definition into Hive");
registerTool("eval", EvalSqlTool.class,
"Evaluate a SQL statement and display the results");
registerTool("export", ExportTool.class,
"Export an HDFS directory to a database table");
registerTool("import", ImportTool.class,
"Import a table from a database to HDFS");
registerTool("import-all-tables", ImportAllTablesTool.class,
"Import tables from a database to HDFS");
registerTool("import-mainframe", MainframeImportTool.class,
"Import datasets from a mainframe server to HDFS");
registerTool("help", HelpTool.class, "List available commands");
registerTool("list-databases", ListDatabasesTool.class,
"List available databases on a server");
registerTool("list-tables", ListTablesTool.class,
"List available tables in a database");
registerTool("merge", MergeTool.class,
"Merge results of incremental imports");
registerTool("metastore", MetastoreTool.class,
"Run a standalone Sqoop metastore");
registerTool("job", JobTool.class,
"Work with saved jobs");
registerTool("version", VersionTool.class,
"Display version information");
最前面那个调用链最后那个tool
对象,对应的就是MergeTool类型。那么接下来我们进入到MergeTool#run
中。
(3)org.apache.sqoop.tool.MergeTool
在这个方法中,生成一个MergeJob对象,然后通过该mergeJob的runMergeJob方法,运行一个MapReduce任务。
public int run(SqoopOptions options) {
try {
// Configure and execute a MapReduce job to merge these datasets.
MergeJob mergeJob = new MergeJob(options);
if (!mergeJob.runMergeJob()) {
LOG.error("MapReduce job failed!");
return 1;
}
} catch (IOException ioe) {
...
}
return 0;
}
(4)org.apache.sqoop.mapreduce.MergeJob
这个类的runMergeJob方法是一个标准的MapReduce程序。我们主要跟踪其Mapper类和Reducer类。
public boolean runMergeJob() throws IOException {
...
if (ExportJobBase.isSequenceFiles(jobConf, newPath)) {
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(MergeRecordMapper.class);
} else {
job.setMapperClass(MergeTextMapper.class);
job.setOutputFormatClass(RawKeyTextOutputFormat.class);
}
...
job.setReducerClass(MergeReducer.class);
}
使用的Reducer类是MergeReducer
,根据文件类型分别生成MergeRecordMapper
和MergeTextMapper
类型的Mapper类。但是,不管表文件类型是什么,这两个Mapper类,最终共同继承了MergeMapperBase
类,并且在各自的map方法中,调用了MergeMapperBase#processRecord
方法,map阶段的主要逻辑也就在该方法中。
(5)org.apache.sqoop.mapreduce.MergeMapperBase
这里我们只分析processRecord方法。
在这个方法中我们看到,每一条记录对应一个MergeRecord对象,这个对象最后会在map的输出中输出到reduce阶段。fieldMap是一个Map类型,其key就是我们通过参数--merge-key
指定的字段,根据该字段名称,从fieldMap中取出当前记录该字段的值,转化成String后,当作map的输出,与MergeRecord对象一起交给Reduce来处理。
protected void processRecord(SqoopRecord r, Context c)
throws IOException, InterruptedException {
MergeRecord mr = new MergeRecord(r, isNew);
Map<String, Object> fieldMap = r.getFieldMap();
if (null == fieldMap) {
throw new IOException("No field map in record " + r);
}
Object keyObj = fieldMap.get(keyColName);
if (null == keyObj) {
throw new IOException("Cannot join values on null key. "
+ "Did you specify a key column that exists?");
} else {
c.write(new Text(keyObj.toString()), mr);
}
}
(6)org.apache.sqoop.mapreduce.MergeReducer
这个类中,reduce方法的逻辑如下。
取出MergeRecord集合中相同key的所有记录,如果新数据集中不包含当前字段值的记录,则从旧的数据集中取该条记录。如果新旧数据集中都有该记录,则从新的数据集中取出该记录。
public void reduce(Text key, Iterable<MergeRecord> vals, Context c)
throws IOException, InterruptedException {
SqoopRecord bestRecord = null;
try {
for (MergeRecord val : vals) {
if (null == bestRecord && !val.isNewRecord()) {
// Use an old record if we don't have a new record.
bestRecord = (SqoopRecord) val.getSqoopRecord().clone();
} else if (val.isNewRecord()) {
bestRecord = (SqoopRecord) val.getSqoopRecord().clone();
}
}
} catch (CloneNotSupportedException cnse) {
throw new IOException(cnse);
}
if (null != bestRecord) {
c.write(bestRecord, NullWritable.get());
}
}
三、Sqoop Merge Tool源码修改
从上面源代码过程分析可以看到,merge过程只能指定一个字段,如果指定多个字段时,会报如下的错,提示当前指定的字段不存在。
16/08/22 15:54:15 INFO mapreduce.Job: Task Id : attempt_1470135750174_2508_m_000004_2, Status : FAILED
Error: java.io.IOException: Cannot join values on null key. Did you specify a key column that exists?
at org.apache.sqoop.mapreduce.MergeMapperBase.processRecord(MergeMapperBase.java:79)
at org.apache.sqoop.mapreduce.MergeTextMapper.map(MergeTextMapper.java:58)
at org.apache.sqoop.mapreduce.MergeTextMapper.map(MergeTextMapper.java:34)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
并且我们发现使用merge key对记录进行合并主要发生在map阶段,所以如果需要支持多个字段的merge时我们只需要修改MergeMapperBase#processRecord
方法即可。修改后的代码如下
在使用sqoop merge时,多个字段用逗号分隔,把每个字段对应的值取出来拼接成新的key。
protected void processRecord(SqoopRecord r, Context c)
throws IOException, InterruptedException {
MergeRecord mr = new MergeRecord(r, isNew);
Map<String, Object> fieldMap = r.getFieldMap();
if (null == fieldMap) {
throw new IOException("No field map in record " + r);
}
Object keyObj = null;
if (keyColName.contains(",")) {
String connectStr = new String(new byte[]{1});
StringBuilder keyFieldsSb = new StringBuilder();
for (String str : keyColName.split(",")) {
keyFieldsSb.append(connectStr).append(fieldMap.get(str).toString());
}
keyObj = keyFieldsSb;
} else {
keyObj = fieldMap.get(keyColName);
}
if (null == keyObj) {
throw new IOException("Cannot join values on null key. "
+ "Did you specify a key column that exists?");
} else {
c.write(new Text(keyObj.toString()), mr);
}
}
上面需要注意的一点是,我的拼接符使用了一个byte的String,这样可以避免以下这种情况。
假设使用“+”当拼接符,如果存在两条记录:
Field a | Field b |
---|---|
a+ | b |
a | +b |
使用字段a,b进行merge时,上面两条不一样的记录最终会被程序认为是相同的,由此会产生新的数据不准确问题。
有关该问题的更多信息可以参考[SQOOP-3002]。
四、多字段的merge
还是以上面两张表为例进行测试,表sqoop_merge_all
使用两个新的分区pt=20160801,pt=20160802
。
sqoop_merge_all (pt=20160801)
数据
sqoop_merge_inc
数据
指定--merge-key type,comments
进行merge,理论上只有id=1
的那一条记录被更新成id=5
的那一条。合并后的数据应该包含id=2,id=3,id=4,id=5
这四条记录。
sqoop merge
--new-data hdfs://m000/user/hive/warehouse/sqoop_merge_inc
--onto hdfs://m000/user/hive/warehouse/sqoop_merge_all/pt=20160801
--target-dir hdfs://m000/user/hive/warehouse/sqoop_merge_all/pt=20160802
--jar-file /usr/local/sqoop/bindir/sqoop_merge.jar
--class-name sqoop_merge
--merge-key type,comments
merge后,最终sqoop_merge_all (pt=20160802)
的数据为: