zoukankan      html  css  js  c++  java
  • Flink 1.10 SQL 读写Kafka

    最近因为疫情的原因,偷了好长时间的懒,现在终于开始继续看Flink 的SQL 了 

    ————————————————

    电脑上的Flink 项目早就升级到了 1.10了,最近还在看官网新的文档,趁着周末,体验一下新版本的SQL API(踩一下坑)。

    直接从之前的 云邪大佬的Flink 的 SQL 样例开始(pom 已经提前整理好了)。

    简单回忆一下内容,就是从kafka 接收 用户行为,根据时间分组,求PV 和UV ,然后输出到 mysql 中。

    先看下加的 依赖:

        <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table</artifactId>
        <version>${flink.version}</version>
        <type>pom</type>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- or... -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-jdbc_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-csv</artifactId>
        <version>${flink.version}</version>
    </dependency>

    table 相关的有这些,注意几个新的依赖,如:flink-jdbc_2.11-1.10.0.jar

    看下对应的sql文件:

    --sourceTable
    CREATE TABLE user_log (
        user_id VARCHAR,
        item_id VARCHAR,
        category_id VARCHAR,
        behavior VARCHAR,
        ts TIMESTAMP(3)
    ) WITH (
        'connector.type' = 'kafka',
        'connector.version' = 'universal',
        'connector.topic' = 'user_behavior',
        'connector.startup-mode' = 'earliest-offset',
        'connector.properties.0.key' = 'zookeeper.connect',
        'connector.properties.0.value' = 'venn:2181',
        'connector.properties.1.key' = 'bootstrap.servers',
        'connector.properties.1.value' = 'venn:9092',
        'update-mode' = 'append',
        'format.type' = 'json',
        'format.derive-schema' = 'true'
    );
    
    --sinkTable
    CREATE TABLE pvuv_sink (
        dt VARCHAR,
        pv BIGINT,
        uv BIGINT
    ) WITH (
        'connector.type' = 'jdbc',
        'connector.url' = 'jdbc:mysql://venn:3306/venn',
        'connector.table' = 'pvuv_sink',
        'connector.username' = 'root',
        'connector.password' = '123456',
        'connector.write.flush.max-rows' = '1'
    );
    
    --insert
    INSERT INTO pvuv_sink(dt, pv, uv)
    SELECT
      DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
      COUNT(*) AS pv,
      COUNT(DISTINCT user_id) AS uv
    FROM user_log
    GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00');

    执行

    遇到的第一个问题就是: "Type TIMESTAMP(6) of table field 'ts' does not match with the physical type TIMESTAMP(3) of the 'ts' field of the TableSource return type"

    看起来 TIMESTAMP 默认的是 TIMESTAMP(6) ,与 source 中的 TIMESTAMP("ts": "2017-11-26T01:00:01Z")不匹配,直接将 ts 的数据类型改为 : TIMESTAMP(3),搞定。

    好像没有其他坑了,直接可以执行,数据也输出到myql 中了

     之后,从sql 的 connector 开始,先看了下 kafak的,Flink 1.10 SQL 中,kafka 只支持 csv、json 和 avro 三种类型。(试了下  json 和 csv)

    两个sql程序,包含读写 json、csn。

    直接将上面的table sink 的sql 修改成写kafak:

    --sinkTable
    CREATE TABLE user_log_sink (
        dt VARCHAR,
        pv BIGINT,
        uv BIGINT
    ) WITH (
        'connector.type' = 'kafka',
        'connector.version' = 'universal',
        'connector.topic' = 'user_behavior_sink',
        'connector.properties.zookeeper.connect' = 'venn:2181',
        'connector.properties.bootstrap.servers' = 'venn:9092',
        'update-mode' = 'append',
        'format.type' = 'json'
    );

    然而,并不能执行。

    报了如下的错:

    AppendStreamTableSink requires that Table has only insert changes.

    WTF,上面的 'update-mode' 明明写的是  'append

    然后,我就开始了一段,并没有什么鸟用的操作:看官网文档、修改sql 的配置。。

    ---------------此处花了不少时间-----------------

    直到最后,突发奇想,直接将source 的内容输出呢,不做任何转换:

    --insert
    INSERT INTO user_log_sink(dt, pv, uv)
    SELECT user_id, item_id, category_id, behavior, ts
    FROM user_log;

    2020-03-18 改: 突然发现 表后面加了 字段,是忘了删的,但是也没报错,5个字段都写到kafka了”

    sink 部分也随之修改:

    --sinkTable
    CREATE TABLE user_log_sink (
        user_id VARCHAR,
        item_id VARCHAR,
        category_id VARCHAR,
        behavior VARCHAR,
        ts TIMESTAMP(3)
    ) WITH (
        'connector.type' = 'kafka',
        'connector.version' = 'universal',
        'connector.topic' = 'user_behavior_sink_1',
        'connector.properties.zookeeper.connect' = 'venn:2181',
        'connector.properties.bootstrap.servers' = 'venn:9092',
        'update-mode' = 'append',
        'format.type' = 'json'
    );

    就好了,好了,了。。

    哎,等官网文档,看完了,应该就知道是为什么了(注:等知道后,来加上)

    然后就开始了最后一个坑。

    在写csv 的时候,遇到了最后一个坑,之前的版本里,“flink-shaded-jackso”我一直用的 “2.7.9-3.0”,但是里面并没有 CsvSchame,所以又有了这个报错:

    Caused by: java.lang.ClassNotFoundException: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema$Builder

    将 flink-shaded-jackso 的版本换成了flink 代码里的版本 “2.9.8-7.0” 

    基本上,就顺畅的完成了kafka connector 读写 json 和 csv 了。

    最后贴上完整的SQL:

    --sourceTable
    CREATE TABLE user_log(
        user_id VARCHAR,
        item_id VARCHAR,
        category_id VARCHAR,
        behavior VARCHAR,
        ts TIMESTAMP(3)
    ) WITH (
        'connector.type' = 'kafka',
        'connector.version' = 'universal',
        'connector.topic' = 'user_behavior',
        'connector.properties.zookeeper.connect' = 'venn:2181',
        'connector.properties.bootstrap.servers' = 'venn:9092',
        'connector.startup-mode' = 'earliest-offset',
        'format.type' = 'json'
    #    'format.type' = 'csv'
    );
    
    --sinkTable
    CREATE TABLE user_log_sink (
        user_id VARCHAR,
        item_id VARCHAR,
        category_id VARCHAR,
        behavior VARCHAR,
        ts TIMESTAMP(3)
    ) WITH (
        'connector.type' = 'kafka',
        'connector.version' = 'universal',
        'connector.topic' = 'user_behavior_sink',
        'connector.properties.zookeeper.connect' = 'venn:2181',
        'connector.properties.bootstrap.servers' = 'venn:9092',
        'update-mode' = 'append',
    #    'format.type' = 'json'
         'format.type' = 'csv'
    );
    
    --insert
    INSERT INTO user_log_sink(dt, pv, uv)
    SELECT user_id, item_id, category_id, behavior, ts
    FROM user_log;

    相关的SQL文件上传到github 上了:  flink-rookic  ,pom.xml 内的依赖也有更新。

    好久没写了,最近会简单尝试一遍SQL 的 kafak/mysql/hbase/es/file/hdfs 等 connector,然后再尝试 SQL 的其他内容

    欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

  • 相关阅读:
    聊聊豆瓣阅读kindle版..顺便悼念一下library.nu…
    PhoneGap+jQm webapp本地化(1)环境搭建以及资源介绍
    尝试分析Q群作为技术群是个不恰当的选择!
    某android平板项目开发笔记计划任务备份
    android 自动化测试的傻瓜实践之旅(UI篇) 小试身手
    latex/Xelatex书籍排版总结顺便附上一本排好的6寸android书…
    某android平板项目开发笔记自定义sharepreference UI
    android ORM框架的性能简单测试(androrm vs ormlite)
    网络管理员必学手册
    PPT插入FLV视频文件的简单方法
  • 原文地址:https://www.cnblogs.com/Springmoon-venn/p/12498883.html
Copyright © 2011-2022 走看看