zoukankan      html  css  js  c++  java
  • Flink MysqlSink 简单样例

    在大数据领域中,有很多nosql 的数据库,典型的 hbase,可以实现大数据量下的快速查询,但是关系型数据的地位还是没办法替代。比如上个项目中,计算完的结果数据,还是会输出到关系型数据库当中。Flink 中没有提供关系型数据的connector,看到有小伙伴在问,怎么实现,就写个简单的demo。

    Flink sink,都有两种方式,对外输出数据:

    继承RichSinkFunction
    实现OutputFormat接口

    这里继承RichSinkFunction 实现 往 mysql 输出数据的sink。

    mysql 表结构如下:

    mysql> desc user;
    +----------+-------------+------+-----+---------+----------------+
    | Field    | Type        | Null | Key | Default | Extra          |
    +----------+-------------+------+-----+---------+----------------+
    | id       | int(11)     | NO   | PRI | NULL    | auto_increment |
    | username | varchar(32) | NO   | UNI | NULL    |                |
    | password | varchar(32) | NO   |     | NULL    |                |
    | sex      | int(11)     | YES  |     | 0       |                |
    | phone    | varchar(18) | YES  |     | NULL    |                |
    +----------+-------------+------+-----+---------+----------------+
    5 rows in set (0.00 sec)

    执行流程如下:

      kafka source -> map -> mysqlSink

    1、继承RichSinkFunction

      主要代码如下:

    env.addSource(source)
            .map(li => {
              val tmp = li.split(",")
              new User(tmp(0), tmp(1), tmp(2)toInt, tmp(3))
            })
            .addSink(new MysqlSink)

    MysqlSink:

    import java.sql.{Connection, DriverManager, PreparedStatement, SQLException}
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
    import org.slf4j.{Logger, LoggerFactory}
    
    class MysqlSink extends RichSinkFunction[User] {
    
      val logger: Logger = LoggerFactory.getLogger("MysqlSink")
      var conn: Connection = _
      var ps: PreparedStatement = _
      val jdbcUrl = "jdbc:mysql://192.168.229.128:3306?useSSL=false&allowPublicKeyRetrieval=true"
      val username = "root"
      val password = "123456"
      val driverName = "com.mysql.jdbc.Driver"
    
      override def open(parameters: Configuration): Unit = {
    
        Class.forName(driverName)
        try {
          Class.forName(driverName)
          conn = DriverManager.getConnection(jdbcUrl, username, password)
    
          // close auto commit
          conn.setAutoCommit(false)
        } catch {
          case e@(_: ClassNotFoundException | _: SQLException) =>
            logger.error("init mysql error")
            e.printStackTrace()
            System.exit(-1);
        }
      }
    
      /**
        * 吞吐量不够话,可以将数据暂存在状态中,批量提交的方式提高吞吐量(如果oom,可能就是数据量太大,资源没有及时释放导致的)
        * @param user
        * @param context
        */
      override def invoke(user: User, context: SinkFunction.Context[_]): Unit = {
        println("get user : " + user.toString)
        ps = conn.prepareStatement("insert into async.user(username, password, sex, phone) values(?,?,?,?)")
        ps.setString(1, user.username)
        ps.setString(2, user.password)
        ps.setInt(3, user.sex)
        ps.setString(4, user.phone)
    
        ps.execute()
        conn.commit()
      }
      override def close(): Unit = {
        if (conn != null){
          conn.commit()
          conn.close()
        }
      }
    }

    2、实现 OutputFormat 接口

      主要代码如下:

    env.addSource(source)
            .map(li => {
              val tmp = li.split(",")
              new User(tmp(0), tmp(1), tmp(2)toInt, tmp(3))
            })
    //        .addSink(new MysqlSink1)
          .writeUsingOutputFormat(new MysqlSink1)

    MysqlSink1

    import java.sql.{Connection, DriverManager, PreparedStatement, SQLException}
    import org.apache.flink.api.common.io.OutputFormat
    import org.apache.flink.configuration.Configuration
    import org.slf4j.{Logger, LoggerFactory}
    
    class MysqlSink1 extends OutputFormat[User]{
    
      val logger: Logger = LoggerFactory.getLogger("MysqlSink1")
      var conn: Connection = _
      var ps: PreparedStatement = _
      val jdbcUrl = "jdbc:mysql://192.168.229.128:3306?useSSL=false&allowPublicKeyRetrieval=true"
      val username = "root"
      val password = "123456"
      val driverName = "com.mysql.jdbc.Driver"
    
      override def configure(parameters: Configuration): Unit = {
        // not need
      }
    
      override def open(taskNumber: Int, numTasks: Int): Unit = {
        Class.forName(driverName)
        try {
          Class.forName(driverName)
          conn = DriverManager.getConnection(jdbcUrl, username, password)
    
          // close auto commit
          conn.setAutoCommit(false)
        } catch {
          case e@(_: ClassNotFoundException | _: SQLException) =>
            logger.error("init mysql error")
            e.printStackTrace()
            System.exit(-1);
        }
      }
    
      override def writeRecord(user: User): Unit = {
    
        println("get user : " + user.toString)
        ps = conn.prepareStatement("insert into async.user(username, password, sex, phone) values(?,?,?,?)")
        ps.setString(1, user.username)
        ps.setString(2, user.password)
        ps.setInt(3, user.sex)
        ps.setString(4, user.phone)
    
        ps.execute()
        conn.commit()
      }
    
      override def close(): Unit = {
    
        if (conn != null){
          conn.commit()
          conn.close()
        }
      }
    }

    比较简单,就不贴测试结果了,如果吞吐量大,一定要改成批量提交的。

    搞定

  • 相关阅读:
    优化cocos2d/x程序的内存使用和程序大小
    cocos2d-x移植:xcode到eclipse
    程序员在编程工作中痛苦的压抑着自己某些强烈的情绪
    C++语言的一些问题
    基数排序-图非常清晰明了
    【Cocos2d-X(1.x 2.x) 修复篇】iOS6 中 libcurl.a 无法通过armv7s编译以及iOS6中无法正常游戏横屏的解决方法
    《C++ Primer》笔记-inline内联函数
    走出你的舒适区
    UDP广播与多播
    测试问题反馈需要包含内容总结
  • 原文地址:https://www.cnblogs.com/Springmoon-venn/p/11223891.html
Copyright © 2011-2022 走看看