hive和mysql数据互导,首先想到的是sqoop,并且可以和调度框架(比如oozie等)配合配置定时任务,还有一种更简单的方式是通过spark-sql:
CREATE OR REPLACE TEMPORARY VIEW tmp_tbl_test USING org.apache.spark.sql.jdbc OPTIONS ( url "jdbc:mysql://1.1.1.1:3306/db_test?useUnicode=true&characterEncoding=utf-8&tinyInt1isBit=false&zeroDateTimeBehavior=convertToNull", dbtable "tbl_test", user 'root', password '123456');
然后可以在spark-sql中对tmp_tbl_test读或者写,相当于直接对mysql进行读或者写,
如果只需要读,甚至不需要将数据从mysql先导到hive,而是直接读mysql;
如果需要并发读取,可以增加参数
partitionColumn "id", lowerBound "10000", upperBound "20000", numPartitions "4"
另外注意如果where里的查询条件设置不当,可能导致全表扫描,具体可以通过explain查询计划来确认,比如常见的时间类型:
select * from test_table where create_time > '2019-01-01'
== Physical Plan == *(2) HashAggregate(keys=[], functions=[count(1)]) +- Exchange SinglePartition +- *(1) HashAggregate(keys=[], functions=[partial_count(1)]) +- *(1) Project +- *(1) Filter (cast(create_time#2 as string) > 2019-01-01) +- *(1) Scan JDBCRelation(test_table) [numPartitions=1] [create_time#1643] PushedFilters: [], ReadSchema: struct<create_time:timestamp> Time taken: 0.034 seconds, Fetched 1 row(s)
以上查询计划会将test_table整个scan到spark中然后做filter过滤create_time,而不是将查询条件push down到scan中,显然这不是期望的行为,修改查询条件为
select * from test_table where create_time > date_sub(now(), 100)
则可解决问题;
另外注意:
1)连接串中的tinyInt1isBit=false,由于spark-sql和sqoop都是基于jdbc来读mysql,然后jdbc中会将mysql的字段类型tinyint默认认为是java.sql.Types.BIT,进而读出来的不是数字int,而是布尔值Boolean,如果不需要这种默认行为,则需要在连接串中增加tinyInt1isBit=false;
2)连接串中的zeroDateTimeBehavior=convertToNull,如果mysql日期字段值为0000-00-00 00:00:00,从spark中读取会报错:
java.sql.SQLException: Value '0000-00-00 00:00:00' can not be represented as java.sql.Timestamp
设置 zeroDateTimeBehavior 属性,当遇到 DATETIME 值完全由 0 组成时,最终的有效值可以设置为,异常(exception),一个近似值(round),或将这个值转换为 null(convertToNull)。
参考:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala