zoukankan      html  css  js  c++  java
  • flink-sql解析canal-json实现实时同步

    package com.lezhi.business.dxxbs.transmission.table
    
    import com.lezhi.common.{CommonTransmissonFunciton, SystemParams}
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.api._
    import org.apache.flink.table.api.bridge.scala._
    
    object user_login {
      def main(args: Array[String]): Unit = {
    
        val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
        val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
        val bnv = StreamTableEnvironment.create(bsEnv, bsSettings)
        val table_name="user_login"
        val primaryKey="USER_ID"
        val table_column=
          """
            |USER_ID  STRING,
            |USER_PHONE  STRING,
            |USER_PWD  STRING,
            |CREAT_TIME  STRING,
            |UPLOAD_TIME  STRING,
            |UNION_ID  STRING,
            |OPEN_ID  STRING
            |""".stripMargin
    
        val sql_source_table="CREATE TABLE source_table_"+table_name+" (" +
          table_column+
          ") WITH (" +
          "'connector' = 'kafka'," +         //连接类型为kafka
          "'topic' = '"+SystemParams.TOPIC+"'," +   //kafka topic名称
          "'properties.bootstrap.servers' = '"+SystemParams.BOOTSTRAP_SERVER+"'," +     //kafka bootstrap.servers配置
          "'scan.startup.mode' = 'earliest-offset'," +   // topic消费位置设置
          "'format' = 'canal-json'," +    //数据格式配置
          "'canal-json.ignore-parse-errors' = 'true'," +    //当解析异常时,忽略字段的解析异常,则会将该字段值设置为null。
          "'canal-json.table.include' ='"+table_name+"')"
    
    
        bnv.executeSql(sql_source_table)
    
    //    bnv.executeSql("select * from source_table_"+table_name).print()
    val sql_result_table ="CREATE TABLE sink_table_"+table_name+" (" +
      table_column+
      ",PRIMARY KEY ("+primaryKey+") NOT ENFORCED" +
      ") WITH (" +
      "'connector' = 'jdbc'," +   //连接类型为jdbc
      "'url' = '"+SystemParams.JDBC_URL_BYMM+"'," +     //he JDBC database url.
      "'table-name' = '"+table_name+"'," +    //连接的表名
      " 'username' ='"+SystemParams.JDBC_USERNAME+"',"+     //连接数据库用户名
      " 'password' ='"+SystemParams.JDBC_PASSWORD+"')"
    
        println(sql_result_table)
        bnv.executeSql(sql_result_table)
        bnv.executeSql("INSERT INTO sink_table_"+table_name+" SELECT * FROM source_table_"+table_name)
    
        bnv.execute(table_name)
    
      }
    }

    注意,下沉时下沉的表必须要有主键,否则会在更新数据时,旧数据和新数据会同时存在

    author@nohert
  • 相关阅读:
    SDUT 2143 图结构练习——最短路径 SPFA模板,方便以后用。。 Anti
    SDUT ACM 1002 Biorhythms 中国剩余定理 Anti
    nyist OJ 119 士兵杀敌(三) RMQ问题 Anti
    SDUT ACM 2157 Greatest Number Anti
    SDUT ACM 2622 最短路径 二维SPFA启蒙题。。 Anti
    二叉索引树 区间信息的维护与查询 Anti
    SDUT ACM 2600 子节点计数 Anti
    UVA 1428 Ping pong 二叉索引树标准用法 Anti
    2010圣诞Google首页效果
    Object
  • 原文地址:https://www.cnblogs.com/gzgBlog/p/15001333.html
Copyright © 2011-2022 走看看