使用sqoop导入增量数据.
核心参数
--check-column
用来指定一些列,这些列在增量导入时用来检查这些数据是否作为增量数据进行导入,和关系行数据库中的自增字段及时间戳类似
这些被指定的列的类型不能使用任意字符类型,如char、varchar等类型都是不可以的,同时 --check-column 可以去指定多个列
--incremental
用来指定增量导入的模式,两种模式分别为append 和 lastmodified
--last-value
指定上一次导入中检查列指定字段的最大值
append模式
- 创建mysql表
CREATE TABLE `sqoop_test` ( `id` int(11) DEFAULT NULL, `name` varchar(255) DEFAULT NULL, `age` int(11) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=latin1
- 创建hive表(表结构与mysql一致)
hive> create external table sqoop_test(id int,name string,age int) > ROW FORMAT DELIMITED > FIELDS TERMINATED BY ',' > STORED AS TEXTFILE > location '/user/hive/external/sqoop_test'; OK Time taken: 0.126 seconds
- 先导入mysql中的原始数据
sqoop import --connect jdbc:mysql://localhost:3306/sqooptest --username root --password 123qwe --table sqoop_test
--hive-import --hive-overwrite --hive-table sqoop_test --fields-terminated-by ',' -m 1
导入成功,查看hive表中的数据
hive> select * from sqoop_test; OK 1 fz 13 3 dx 18 2 test 13 Time taken: 0.074 seconds, Fetched: 3 row(s)
- 在mysql中添加几条增量数据
1 fz 13 3 dx 18 2 test 13 4 test_add_1 14 5 test_add_2 19 6 test-add-3 35 7 test-7 7 8 test-8 8
- 现在开始导入增量数据,增量数据加入前最大id值为3
开始尝试这样写
sqoop import --connect jdbc:mysql://localhost:3306/sqooptest --username root --password 123qwe --table sqoop_test
--hive-import --hive-table sqoop_test --check-column id --incremental append --last-value 3 -m 1
但是提示append模式不支持写入到hive表中
Append mode for hive imports is not yet supported. Please remove the parameter --append-mode
正确写法,直接写入到hdfs
sqoop import --connect jdbc:mysql://localhost:3306/sqooptest --username root --password 123qwe --table sqoop_test
--target-dir '/user/hive/external/sqoop_test' --incremental append --check-column id --last-value 3 -m 1
job完成,在hive查看表数据
hive> select * from sqoop_test; OK 1 fz 13 3 dx 18 2 test 13 4 test_add_1 14 5 test_add_2 19 6 test-add-3 35 7 test-7 7 8 test-8 8 Time taken: 0.075 seconds, Fetched: 8 row(s)
成功。
Lastmodified模式
- Mysql新建一个表customertest
CREATE TABLE customertest ( id INT, name VARCHAR (20), last_mod TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP );
插入数据
insert into customertest(id,name) values(1,'enzo'); insert into customertest(id,name) values(2,'din'); insert into customertest(id,name) values(3,'fz'); insert into customertest(id,name) values(4,'dx'); insert into customertest(id,name) values(5,'ef');
1 enzo 2017-09-20 16:12:54 2 din 2017-09-20 16:12:58 3 fz 2017-09-20 16:13:01 4 dx 2017-09-20 16:13:05 5 ef 2017-09-20 16:13:09
- 在hive中新建表customertest,表结构类似
hive> create table customertest(id int,name string,last_mod string) > ROW FORMAT DELIMITED > FIELDS TERMINATED BY ',' > STORED AS TEXTFILE; OK Time taken: 0.189 seconds
- 导入mysql表中的数据
sqoop import --connect jdbc:mysql://localhost:3306/sqooptest --username root --password 123qwe --table customertest
--hive-import --hive-table customertest --fields-terminated-by ',' -m 1
hive> select * from customertest; OK 1 enzo 2017-09-20 16:12:54.0 2 din 2017-09-20 16:12:58.0 3 fz 2017-09-20 16:13:01.0 4 dx 2017-09-20 16:13:05.0 5 ef 2017-09-20 16:13:09.0 Time taken: 0.064 seconds, Fetched: 5 row(s)
- 此时在mysql中插入一条数据
insert into customertest(id,name) values(6,'enzoDin')
插入之后表中的数据
1 enzo 2017-09-20 16:12:54 2 din 2017-09-20 16:12:58 3 fz 2017-09-20 16:13:01 4 dx 2017-09-20 16:13:05 5 ef 2017-09-20 16:13:09 6 enzoDin 2017-09-20 16:17:53
- 再来根据last_mod来导入增量数据
sqoop import --connect jdbc:mysql://localhost:3306/sqooptest --username root --password 123qwe --table customertest
--hive-import --hive-table customertest --check-column last_mod --incremental lastmodified --last-value "2017-09-20 16:13:09" --fields-terminated-by ',' -m 1
导入成功
hive> select * from customertest; OK 1 enzo 2017-09-20 16:12:54.0 2 din 2017-09-20 16:12:58.0 3 fz 2017-09-20 16:13:01.0 4 dx 2017-09-20 16:13:05.0 5 ef 2017-09-20 16:13:09.0 5 ef 2017-09-20 16:13:09.0 6 enzoDin 2017-09-20 16:17:53.0 Time taken: 0.064 seconds, Fetched: 7 row(s)
查看hive中的数据会发现插入了两条数据,这是因为使用lastmodified 会将大于等于 –last-value 的值都导入进来,所以会造成数据的重复
研究下merge-key....