zoukankan      html  css  js  c++  java
  • flink操作mysql

    Flink读写mysql

    如果是mvn项目的话,需要预先导入相应的包:

            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-jdbc_2.11</artifactId>
                <version>1.9.2</version>
            </dependency> 
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.24</version>
            </dependency>

    1、读

    import java.time.LocalDateTime
    
    import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
    import org.apache.flink.api.java.typeutils.RowTypeInfo
    import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
    import org.apache.flink.types.Row
    import org.apache.log4j.Logger
    
    object OperatorMysql extends Logger("opeartorMysql") {
      val log = Logger.getLogger("opeartorMysql")
    
      def main(args: Array[String]): Unit = {
        val env = ExecutionEnvironment.getExecutionEnvironment
        val driver = "com.mysql.jdbc.Driver"
        val url = "jdbc:mysql://192.168.1.1:3306/ipvacloud?useUnicode=true&characterEncoding=utf-8"
        val username = "root"
        val password = "123456"
        log.info("--------read mysql-----------")
        val sql_read = "select relationid,year,month from reid_kequn_arave_times"
        readMysql(env, url, driver, username, password, sql_read)
    }
    /**
        * 读mysql
        *
        * @param env
        * @param url
        * @param user
        * @param pwd
        * @param sql
        */
      def readMysql(env: ExecutionEnvironment, url: String, driver: String, user: String, pwd: String, sql: String) = {
        val dataResult: DataSet[Row] = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
          .setDrivername(driver)
          .setDBUrl(url)
          .setUsername(user)
          .setPassword(pwd)
          .setQuery(sql)
          .setRowTypeInfo(new RowTypeInfo(
            BasicTypeInfo.STRING_TYPE_INFO,
            BasicTypeInfo.STRING_TYPE_INFO,
            BasicTypeInfo.STRING_TYPE_INFO))
          .finish())
        dataResult.map(x => {
          val relationid = x.getField(0)
          val year = x.getField(1)
          val month = x.getField(2)
          (relationid, year, month)
        }).print()
      }

    运行结果:

    2、写

    import java.time.LocalDateTime
    
    import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
    import org.apache.flink.api.java.typeutils.RowTypeInfo
    import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
    import org.apache.flink.types.Row
    import org.apache.log4j.Logger
    
    /**
      * AUTHOR Guozy
      * DATE   2020/3/14-10:44
      **/
    object OperatorMysql extends Logger("opeartorMysql") {
      val log = Logger.getLogger("opeartorMysql")
    
      def main(args: Array[String]): Unit = {
        val env = ExecutionEnvironment.getExecutionEnvironment
        val driver = "com.mysql.jdbc.Driver"
        val url = "jdbc:mysql://192.168.1.1:3306/ipvacloud?useUnicode=true&characterEncoding=utf-8"
        val username = "root"
        val password = "winner@001"
        log.info("--------write mysql-----------")
        val sql_write = "insert into attention_newdata(Hostname,ChannelNum,enabled,LastTime,CreateTime,ModifyTime) values(?,?,?,?,?,?) on duplicate key update ModifyTime=NOW()"
        val curTime = LocalDateTime.now().toString.replace("T", " ")
        val outputData = env.fromElements(("ttt", "0", "1", curTime, curTime, curTime))
          .map(x => {
            val row = new Row(6)
            row.setField(0, x._1)
            row.setField(1, x._2)
            row.setField(2, x._3)
            row.setField(3, x._4)
            row.setField(4, x._5)
            row.setField(5, x._6)
            row
          })
        writeMysql(env, outputData, url, username, password, sql_write)
      }
    // 写mysql
      def writeMysql(env: ExecutionEnvironment, outputData: DataSet[Row], url: String, user: String, pwd: String, sql: String) = {
        outputData.output(JDBCOutputFormat.buildJDBCOutputFormat()
          .setDrivername("com.mysql.jdbc.Driver")
          .setDBUrl(url)
          .setUsername(user)
          .setPassword(pwd)
          .setQuery(sql)
          .finish())
        env.execute("insert data to mysql")
        print("data write successfully")
      }
    

    运行结果:

      

      

  • 相关阅读:
    视图的INSERT、UPDATE、DELETE注意事项
    SQL SERVER 用户管理 TSQL 命令
    SQL SERVER 利用存储过程查看角色和用户信息
    犯错了~
    配置tomcat
    python中的类继承之super
    python中参数解析
    python的几个内联函数:lambda ,zip,filter, map, reduce
    第一次性能测试http_load
    不能在 DropDownList 中选择多个项
  • 原文地址:https://www.cnblogs.com/Gxiaobai/p/12645497.html
Copyright © 2011-2022 走看看