zoukankan      html  css  js  c++  java
  • flink-sql解析canal-json实现实时同步

    package com.lezhi.business.dxxbs.transmission.table
    
    import com.lezhi.common.{CommonTransmissonFunciton, SystemParams}
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.api._
    import org.apache.flink.table.api.bridge.scala._
    
    object user_login {
      def main(args: Array[String]): Unit = {
    
        val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
        val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
        val bnv = StreamTableEnvironment.create(bsEnv, bsSettings)
        val table_name="user_login"
        val primaryKey="USER_ID"
        val table_column=
          """
            |USER_ID  STRING,
            |USER_PHONE  STRING,
            |USER_PWD  STRING,
            |CREAT_TIME  STRING,
            |UPLOAD_TIME  STRING,
            |UNION_ID  STRING,
            |OPEN_ID  STRING
            |""".stripMargin
    
        val sql_source_table="CREATE TABLE source_table_"+table_name+" (" +
          table_column+
          ") WITH (" +
          "'connector' = 'kafka'," +         //连接类型为kafka
          "'topic' = '"+SystemParams.TOPIC+"'," +   //kafka topic名称
          "'properties.bootstrap.servers' = '"+SystemParams.BOOTSTRAP_SERVER+"'," +     //kafka bootstrap.servers配置
          "'scan.startup.mode' = 'earliest-offset'," +   // topic消费位置设置
          "'format' = 'canal-json'," +    //数据格式配置
          "'canal-json.ignore-parse-errors' = 'true'," +    //当解析异常时,忽略字段的解析异常,则会将该字段值设置为null。
          "'canal-json.table.include' ='"+table_name+"')"
    
    
        bnv.executeSql(sql_source_table)
    
    //    bnv.executeSql("select * from source_table_"+table_name).print()
    val sql_result_table ="CREATE TABLE sink_table_"+table_name+" (" +
      table_column+
      ",PRIMARY KEY ("+primaryKey+") NOT ENFORCED" +
      ") WITH (" +
      "'connector' = 'jdbc'," +   //连接类型为jdbc
      "'url' = '"+SystemParams.JDBC_URL_BYMM+"'," +     //he JDBC database url.
      "'table-name' = '"+table_name+"'," +    //连接的表名
      " 'username' ='"+SystemParams.JDBC_USERNAME+"',"+     //连接数据库用户名
      " 'password' ='"+SystemParams.JDBC_PASSWORD+"')"
    
        println(sql_result_table)
        bnv.executeSql(sql_result_table)
        bnv.executeSql("INSERT INTO sink_table_"+table_name+" SELECT * FROM source_table_"+table_name)
    
        bnv.execute(table_name)
    
      }
    }

    注意,下沉时下沉的表必须要有主键,否则会在更新数据时,旧数据和新数据会同时存在

    author@nohert
  • 相关阅读:
    Debian安装autoconf
    Linux 解决 bash ./ 没有那个文件或目录 的方法
    C语言strtok()函数:字符串分割
    java.util.logging.Logger使用详解 (转)
    java中Logger.getLogger(Test.class)
    jquery ajax中success与complete的执行顺序 (转)
    navicat如何导入sql文件和导出sql文件
    MySQL修改root密码的多种方法(转)
    查看三种MySQL字符集的方法(转)
    mysql 5.7.13 安装配置方法图文教程(linux) (转)
  • 原文地址:https://www.cnblogs.com/gzgBlog/p/15001333.html
Copyright © 2011-2022 走看看