zoukankan      html  css  js  c++  java
  • Flink 中定时加载外部数据

    社区中有好几个同学问过这样的场景:  

    flink 任务中,source 进来的数据,需要连接数据库里面的字段,再做后面的处理

    这里假设一个 ETL 的场景,输入数据包含两个字段 “type, userid....” ,需要根据 type,连接一张 mysql 的配置表,关联 type 对应的具体内容。相对于输入数据的数量,type 的值是很少的(这里默认只有10种), 所以对应配置表就只有10条数据,配置是会定时修改的(比如跑批补充数据),配置的修改必须在一定时间内生效。

    实时 ETL,需要用里面的一个字段去关联数据库,补充其他数据,进来的数据中关联字段是很单一的(就10个),对应数据库的数据也很少,如果用 异步 IO,感觉会比较傻(浪费资源、性能还不好)。同时数据库的数据是会不定时修改的,所以不能在启动的时候一次性加载。

    Flink 现在对应这种场景可以使用  Boradcase state 做,如:基于Broadcast 状态的Flink Etl Demo

    这里想说的是另一种更简单的方法: 使用定时器,定时加载数据库的数据  (就是简单的Java定时器)

    先说一下代码流程:

    1、自定义的 source,输入逗号分隔的两个字段

    2、使用 RichMapFunction  转换数据,在 open 中定义定时器,定时触发查询 mysql 的任务,并将结果放到一个 map 中

    3、输入数据关联 map 的数据,然后输出

    先看下数据库中的数据:

    mysql> select * from timer;
    +------+------+
    | id   | name |
    +------+------+
    | 0    | 0zOq |
    | 1    | 1hKC |
    | 2    | 2ibM |
    | 3    | 3fCe |
    | 4    | 4TaM |
    | 5    | 5URU |
    | 6    | 6WhP |
    | 7    | 7zjn |
    | 8    | 8Szl |
    | 9    | 9blS |
    +------+------+
    10 rows in set (0.01 sec)

    总共10条数据,id 就是对应的关联字段,需要填充的数据是 name

    下面是主要的代码:// 自定义的source,输出 x,xxx 格式随机字符

        val input = env.addSource(new TwoStringSource)
        val stream = input.map(new RichMapFunction[String, String] {
    
          val jdbcUrl = "jdbc:mysql://venn:3306?useSSL=false&allowPublicKeyRetrieval=true"
          val username = "root"
          val password = "123456"
          val driverName = "com.mysql.jdbc.Driver"
          var conn: Connection = null
          var ps: PreparedStatement = null
          val map = new util.HashMap[String, String]()
    
          override def open(parameters: Configuration): Unit = {
            logger.info("init....")
            query()
            // new Timer
            val timer = new Timer(true)
            // schedule is 10 second 定义了一个10秒的定时器,定时执行查询数据库的方法
    timer.schedule(new TimerTask { override def run(): Unit = { query() } }, 10000, 10000) } override def map(value: String): String = { // concat input and mysql data,简单关联输出 value + "-" + map.get(value.split(",")(0)) } /** * query mysql for get new config data */ def query() = { logger.info("query mysql") try { Class.forName(driverName) conn = DriverManager.getConnection(jdbcUrl, username, password) ps = conn.prepareStatement("select id,name from venn.timer") val rs = ps.executeQuery while (!rs.isClosed && rs.next) { val id = rs.getString(1) val name = rs.getString(2)
    // 将结果放到 map 中 map.put(id, name) } logger.info(
    "get config from db size : {}", map.size()) } catch { case e@(_: ClassNotFoundException | _: SQLException) => e.printStackTrace() } finally { ps.close() conn.close() } } }) // .print() val sink = new FlinkKafkaProducer[String]("timer_out" , new MyKafkaSerializationSchema[String]() , Common.getProp , FlinkKafkaProducer.Semantic.EXACTLY_ONCE) stream.addSink(sink)

    简单的Java定时器:

    val timer = new Timer(true)
    // schedule is 10 second, 5 second between successive task executions
    timer.schedule(new TimerTask {
      override def run(): Unit = {
        query()
      }
    }, 10000, 10000)

    ------------------20200327 改---------------------

    之前 博客写的有问题,public void schedule(TimerTask task, long delay, long period) 的第三个参数才是重复执行的时间间隔,0 是不执行,我之前写的时候放上去的案例,调用的 Timer 的构造方法是: public void schedule(TimerTask task, long delay) 只会在 delay 时间后调用一次,并不会重复执行,不需要 调用 :  public void schedule(TimerTask task, long delay, long period)   这样的构造方法,才能真正的定时执行。

    使用之前的方法执行的,会看到query 方法执行了两次,是 open 中主动调用了一次和 之后调度了一次,定时器就结束了。

    感谢社区大佬指出

    同时社区还有大佬指出 : ScheduledExecutorService 会比 timer 更好;理由: Timer里边的逻辑失败的话不会抛出任何异常,直接结束,建议用ScheduledExecutorService替换Timer并且捕获下异常看看

    ------------------------------------

    看下输出的数据:

    7,N-7zjn
    7,C-7zjn
    7,U-7zjn
    4,T-4TaM
    7,J-7zjn
    9,R-9blS
    4,C-4TaM
    9,T-9blS
    4,A-4TaM
    6,I-6WhP
    9,U-9blS

    注:“-” 之前是原始数据,后面是关联后的数据

    部署到服务器上定时器的调度:

    2019-09-28 18:28:13,476 INFO  com.venn.stream.api.timer.CustomerTimerDemo$                  - query mysql
    2019-09-28 18:28:13,480 INFO  com.venn.stream.api.timer.CustomerTimerDemo$                  - get config from db size : 10
    2019-09-28 18:28:18,553 INFO  org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - FlinkKafkaProducer 0/1 - checkpoint 17 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=null, producerId=-1, epoch=-1], transactionStartTime=1569666488499} from checkpoint 17
    2019-09-28 18:28:23,476 INFO  com.venn.stream.api.timer.CustomerTimerDemo$                  - query mysql
    2019-09-28 18:28:23,481 INFO  com.venn.stream.api.timer.CustomerTimerDemo$                  - get config from db size : 10
    2019-09-28 18:28:28,549 INFO  org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - FlinkKafkaProducer 0/1 - checkpoint 18 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=null, producerId=-1, epoch=-1], transactionStartTime=1569666498505} from checkpoint 18
    2019-09-28 18:28:33,477 INFO  com.venn.stream.api.timer.CustomerTimerDemo$                  - query mysql
    2019-09-28 18:28:33,484 INFO  com.venn.stream.api.timer.CustomerTimerDemo$                  - get config from db size : 10

    十秒调度一次

     欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

     

  • 相关阅读:
    leecode 240. 搜索二维矩阵 II
    leecode 103. 二叉树的锯齿形层序遍历
    leecode 362. 敲击计数器
    leecode 152. 乘积最大子数组
    leecode 560. 和为K的子数组
    面试题 08.12. 八皇后
    leecode 450. 删除二叉搜索树中的节点
    leecode 384. 打乱数组
    leecoode 138. 复制带随机指针的链表
    mysql基本指令2
  • 原文地址:https://www.cnblogs.com/Springmoon-venn/p/11604786.html
Copyright © 2011-2022 走看看