zoukankan      html  css  js  c++  java
  • Flink SQL任务自动生成与提交

    起因

    事情的起因,是看到一篇公众号文章Apache Flink 在汽车之家的应用与实践,里面提到了“基于 SQL 的开发流程”。在平台提供以上功能的基础上,用户可以快速的实现 SQL 作业的开发:
    创建一个 SQL 任务;

    1.编写 DDL 声明 Source 和 Sink;

    2.编写 DML,完成主要业务逻辑的实现;

    3.在线查看结果,若数据符合预期,添加 INSERT INTO 语句,写入到指定 Sink 中即可。

    即这种功能:

    之前也写过一个spark自动提交任务的小项目,但始终无法做到简单交互自动生成任务,因而最终没有用到生产环境当中。Flink的SQL是Flink生态里的一等公民,可以做到直接一条SQL语句把数据源source加载到一张表,也可以一条SQL语句把数据写入到sink终端。这让交互式自动生成任务能够简单实现。

    本文前提是,简单实现。如果不考虑性价比,当然也可以做到一个WEB系统,全交互式,点点点,需要的什么参数,什么功能提交到后台,解析生成任务也是可以做的,但这样性价比不高。

    思路

    1.首先是一个简单的WEB系统,后台springboot可以快速实现,前端thymeleaf可以快速做一个简单的页面,不考虑CSS,JS。

    2.然后提供一个页面,传入flink集群参数,mainclass,parallelism等,以及业务参数,主要是sourceSql,sinkSql,transformationSql。然后后台提供一个Flink任务的模板,将sourceSql,sinkSql,transformationSql这三个参数替换掉。再编译成class并生成jar包。

    3.然后将jar包提交到flink集群,再将任务提交到集群。
    这一点可以通过Flink提供的restapi实现。
    https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/ops/rest_api/

    关于第2点,也可以在模板当中以提供参数的形式传入sourceSql,sinkSql,transformationSql,flink 在提交任务的接口里也提供了这样的参数program-args,但只适合简单的参数,像sql语句这种带大量的引号,空格,换行符不建议这样传参。

    实现

    功能本身实现起来倒没太大问题。代码就不提供了,光占篇幅。有兴趣的同学可以看github仓库,这里

    1.配置

    需要修改application的以下4个配置
    分别是
    flink集群的访问地址,
    布置WEB服务的根目录地址,
    编译flink任务所依赖的jar包目录,
    flink任务main class,也就是flink任务模板的路径(不要写成包名的格式)
    flinkurl=http://localhost:8081
    rootpath=D://ideaproject//test//flink_remotesubmit
    dependmentpath=D://ideaproject//test//flink_remotesubmit//lib
    mainpath=//src//main//java//com//yp//flink//model//TableApiModel.java

    2.界面如下

    提交成功后,跳转到flink任务界面

    3.环境

    基于
    flink 1.13.2
    spring boot 2.5

    问题

    遇到的问题,来自于汽车之家的那张图。就文章里表述的需求来说,需要每天按小时统计PV,UV。

    1.flink 的kafka connector 目前只支持批模式(有界数据),不支持流模式(无界)。

    如图中红色标记处。
    如果按inStreamingMode,那就是实时统计,没法设定batch频次;如果按inBatchMode批次,上图可见,目前不支持。而且kafka connector里参数也只支持kafka的起始位置,不支持结束位置。
    所以这里怎么实现的?目前不得而知。

    2.还是如上图所见,在sink的时候是只支持append模式,而在append模式下,不支持group by,因为使用了group by 会改行结果行。
    如果强行使用group by 将会抛出异常:
    目前sink只支持append模式,如果使用了group by 等会改变结果行,会报错:AppendStreamTableSink doesn't support consuming update changes which is produced by node GroupAggregate

    所以对kafka数据源进行sink的怎样进行分组聚合?

    Flink SQL还是非常强大的。前几天发布的flink 14版本,批执行模式现在支持在同一应用中混合使用 DataStream API 和 SQL/Table API(此前仅支持单独使用 DataStream API 或 SQL/Table API)。也引入了更多的connetor。

    苍茫之天涯,乃吾辈之所爱也;浩瀚之程序,亦吾之所爱也,然则何时而爱耶?必曰:先天下之忧而忧,后天下之爱而爱也!
  • 相关阅读:
    Kanzi Studio中的概念
    Linux基本操作1
    Kanzi入门
    Kanzi UI Solution
    Linux下内存占用和CPU占用的计算
    AD19新功能之交互式等长
    AD19新功能之跟随走线
    AD19新功能之Gloss Selected(修线)
    AD19新功能之ActiveRoute
    RT-Thread--内核移植
  • 原文地址:https://www.cnblogs.com/eryuan/p/15406486.html
Copyright © 2011-2022 走看看