zoukankan      html  css  js  c++  java
  • flink-Stream解析canal-json数据

    引入依赖

        <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.33</version>
            </dependency>
    

      

    val env = StreamExecutionEnvironment.getExecutionEnvironment
        println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date)+"flink 代码开始运行")
        val begin_date = new EQTJStreamUtil().getParamDate(ParameterTool.fromArgs(args))
        println(begin_date)
    
        //添加kakka数据源
    
        val reportStreamSouce = env.addSource(new FlinkKafkaConsumer[String]("bymm_topic", new SimpleStringSchema(), new EQTJStreamUtil().getKafkaProps())
          .setStartFromEarliest())  //设置消费kafka位置
          .map(JSON.parseObject(_))
          .filter(_.get("table")=="epidemic_report")
          .filter(_.get("type").toString.matches("(INSERT|UPDATE)"))
          .map(_.getJSONArray("data").getObject(0,new Dxxbs_epidemic_report().getClass))
    //      .filter(_.getSet_id=="1")
          .filter(_.getCreat_time > begin_date)
    author@nohert
  • 相关阅读:
    适配器模式(Adapter)
    状态模式(State)
    观察者模式(Publish/Subscribe)
    建造者模式(Builder)
    数据库,知识点汇总
    数据库
    css样式大全
    遮罩层
    js数组冒泡
    js基本方法
  • 原文地址:https://www.cnblogs.com/gzgBlog/p/14928301.html
Copyright © 2011-2022 走看看