Flink 1.11 版本对SQL的优化是很多的,其中最重要的一点就是 hive 功能的完善,不再只是作为持久化的 Catalog,
而是可以用原生的 Flink SQL 流式的写数据到入 hive中
本文使用官网 “Streaming Writing” 案例 (https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html#streaming-writing),
流式写数据到 Hive (刚好之前有同学咨询官网的例子不能写入成功)
The Hive Streaming Sink re-use Filesystem Streaming Sink to integrate Hadoop OutputFormat/RecordWriter to streaming writing. Hadoop RecordWriters are Bulk-encoded Formats, Bulk Formats rolls files on every checkpoint.
Hive Streaming Sink 重用 Filesystem Streaming Sink,集成Hadoop OutputFormat / RecordWriter 流式写入。 Hadoop RecordWriters是 Bulk-encoded 格式,Bulk 格式在每个 checkpoint 上滚动文件。
环境:
Flink 1.11.2
Hive 2.3.6
Hadoop 2.7
sqlSubmit,我开源 Flink SQL 提交程序(Table Api 的方式提交 SQL,代码已提交 Github:https://github.com/springMoon/sqlSubmit)
官网SQL 如下:
SET table.sql-dialect=hive; -- 要指定 hive 方言,不然 hive 表创建不成功 CREATE TABLE hive_table ( user_id STRING, order_amount DOUBLE ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', -- hive 分区提取器 'sink.partition-commit.trigger'='partition-time', -- 分区触发提交 'sink.partition-commit.delay'='1 h', -- 提交延迟 'sink.partition-commit.policy.kind'='metastore,success-file' -- 提交类型 ); SET table.sql-dialect=default; -- 换回 default 方言 CREATE TABLE kafka_table ( user_id STRING, order_amount DOUBLE, log_ts TIMESTAMP(3), WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND ) WITH (...); -- kafka 表的 tblproperties -- streaming sql, insert into hive table 写入的 sql, 最后两个字段是 是写入分区 INSERT INTO TABLE hive_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table; -- batch sql, select with partition pruning SELECT * FROM hive_table WHERE dt='2020-05-20' and hr='12';
官网的案例默认是在 sql-client 中执行的,这里是用 Table Api,所以会有点不同,先看下完整的 SQL
drop table if exists user_log; CREATE TABLE user_log ( user_id VARCHAR ,item_id VARCHAR ,category_id VARCHAR ,behavior VARCHAR ,ts TIMESTAMP(3) ,WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector.type' = 'kafka' ,'connector.version' = 'universal' ,'connector.topic' = 'user_behavior' ,'connector.properties.zookeeper.connect' = 'venn:2181' ,'connector.properties.bootstrap.servers' = 'venn:9092' ,'connector.properties.group.id' = 'user_log' ,'connector.startup-mode' = 'group-offsets' ,'connector.sink-partitioner' = 'fixed' ,'format.type' = 'json' ); -- set table.sql-dialect=hive; -- kafka sink drop table if exists hive_table; CREATE TABLE hive_table ( user_id STRING ,item_id STRING ,category_id STRING ,behavior STRING ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 min', 'sink.partition-commit.policy.kind'='metastore,success-file' ); -- streaming sql, insert into hive table INSERT INTO TABLE hive_table SELECT user_id, item_id, category_id, behavior, DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH') FROM user_log;
跟官网基本一样,唯一的不同是,在指定 sql 方言的时候,Table Api 是这样的:
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tabEnv.getConfig().setSqlDialect(SqlDialect.HIVE)
flink 方言官网: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_dialect.html#table-api
注:
partition.time-extractor.timestamp-pattern 指定分区提取器提取时间戳的格式
sink.partition-commit.trigger 触发分区提交的类型可以指定 "process-time" 和 "partition-time" 处理时间和分区时间
如指定天、小时、分钟三级分区:
partition.time-extractor.timestamp-pattern = $dt $hr:$ms:00
分区字段则是: DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm')
hive 表数据如下:
hive> select * from hive_table limit 10; OK 107025 1007359 4391936 pv 2017-11-26 01 334124 3904483 2520771 cart 2017-11-26 01 475192 3856358 2465336 pv 2017-11-26 01 475192 3856358 2465336 pv 2017-11-26 01 864482 3398512 1639158 pv 2017-11-26 01 987980 3225231 2578647 pv 2017-11-26 01 987980 3225231 2578647 pv 2017-11-26 01 563592 3377194 2131531 pv 2017-11-26 01 939115 241366 4756105 pv 2017-11-26 01 939115 241366 4756105 pv 2017-11-26 01 Time taken: 0.112 seconds, Fetched: 10 row(s)
hive 表对于目录文件情况:
[venn@venn ~]$ hadoop fs -ls /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/ Found 7 items -rw-r--r-- 1 venn supergroup 0 2020-09-24 17:04 /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/.part-35464358-55dc-4d76-a174-ba50b1f97c1b-0-5.inprogress.42f75ffc-8c4d-4009-a00a-93482a96a2b8 -rw-r--r-- 1 venn supergroup 0 2020-09-24 17:02 /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/_SUCCESS -rw-r--r-- 1 venn supergroup 7190 2020-09-24 16:56 /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/part-35464358-55dc-4d76-a174-ba50b1f97c1b-0-0 -rw-r--r-- 1 venn supergroup 3766 2020-09-24 16:58 /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/part-35464358-55dc-4d76-a174-ba50b1f97c1b-0-1 -rw-r--r-- 1 venn supergroup 3653 2020-09-24 17:00 /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/part-35464358-55dc-4d76-a174-ba50b1f97c1b-0-2 -rw-r--r-- 1 venn supergroup 3996 2020-09-24 17:02 /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/part-35464358-55dc-4d76-a174-ba50b1f97c1b-0-3 -rw-r--r-- 1 venn supergroup 3719 2020-09-24 17:04 /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/part-35464358-55dc-4d76-a174-ba50b1f97c1b-0-4
.part-xxx 文件就是正在写的文件,下面几个就是已经提交的文件
在做官网的案例的过程,还算比较顺利,但是也遇到几个问题:
- 1、jar 包问题,写 hive 需要 hadoop-mapreduce-client-core-2.7.7.jar
- 2、参数 sink.partition-commit.delay 的单位支持: DAYS: (d | day | days), HOURS: (h | hour | hours), MINUTES: (min | minute | minutes), SECONDS: (s | sec | secs | second | seconds), MILLISECONDS: (ms | milli | millis | millisecond | milliseconds), MICROSECONDS: (µs | micro | micros | microsecond | microseconds), NANOSECONDS: (ns | nano | nanos | nanosecond | nanoseconds)
- 3、hive 表日期类型字段,目前是只支持 TIMESTAMP(9),但是 flink 的 timestamp 是 3 位与 6 位(放弃 日期类型,反正 String 类型的日期 hive 也可以识别)
```java
java.time.format.DateTimeParseException: Text '2017-11-26-01 00:00:00' could not be parsed, unparsed text found at index 10
```
- 4、web 页面 metrics,source 块 往 Sink 块写数 Records Sent 的问题, Records Sent 数对应 checkpoint 次数,因为只会在 checkpoint 的时候才会提交数据到 HDFS,这个消息应该是某个信号数据,而不是真实的数据条数 (上面贴的官网说明有讲,如果没有 checkpoint,数据会写到 hdfs,但是会出于 inprogress状态,并且是 "." 开头的文件,对 hive 来说是隐藏文件,查不到的)
再贴下flink lib:
flink-connector-hbase_2.11-1.11.2.jar flink-json-1.11.2.jar hbase-common-2.1.4.jar hive-exec-2.3.6.jar log4j-slf4j-impl-2.12.1.jar flink-connector-hive_2.11-1.11.2.jar flink-shaded-zookeeper-3.4.14.jar hbase-protocol-2.1.4.jar htrace-core4-4.2.0-incubating.jar metrics-core-3.2.1.jar flink-connector-kafka_2.11-1.11.2.jar flink-table_2.11-1.11.2.jar hbase-protocol-shaded-2.1.4.jar kafka-clients-2.2.0.jar flink-connector-kafka-base_2.11-1.11.2.jar flink-table-blink_2.11-1.11.2.jar hbase-shaded-miscellaneous-2.1.0.jar log4j-1.2-api-2.12.1.jar flink-csv-1.11.2.jar hadoop-mapreduce-client-core-2.7.7.jar hbase-shaded-netty-2.1.0.jar log4j-api-2.12.1.jar flink-dist_2.11-1.11.2.jar hbase-client-2.1.4.jar hbase-shaded-protobuf-2.1.0.jar log4j-core-2.12.1.jar
欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文