zoukankan      html  css  js  c++  java
  • FLINK实例(4): CONNECTORS(3)MySQL读写

    1 工程目录结构

     pom.xml

            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.56</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-jdbc_2.11</artifactId>
                <version>1.9.2</version>
            </dependency>

    2 flink 读取MySQL

    1)  通过自定义source提交

    MySQLSource

    package com.atguigu.flink.source
    
    import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
    
    import com.atguigu.flink.bean.SensorReading
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
    
    class MySQLSource extends RichSourceFunction[SensorReading] {
      var conn:Connection = null
      var ps:PreparedStatement = null
      // 流打开时操作
      override def open(parameters: Configuration): Unit = {
        // 加载驱动
        Class.forName("com.mysql.jdbc.Driver")
        // 数据库连接
        conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/gmall1122?useSSL=false","root","123456")
        ps = conn.prepareStatement("select * from sensor limit 5")
      }
    
      // 流运行时操作
      override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
        try {
          var resultSet:ResultSet = ps.executeQuery()
          while (resultSet.next()){
            var id:String = resultSet.getString("id")
            var curTime:Long = resultSet.getLong("timestamp")
            var timepreture:Double = resultSet.getDouble("timepreture")
            sourceContext.collect(SensorReading(id,curTime,timepreture))
    
          }
        } catch {
          case _:Exception => 0
        } finally {
          conn.close()
        }
    
      }
      
      // 流关闭时操作
      override def cancel(): Unit = {
        try{
          if(conn!=null){
            conn.close()
          }
          if(ps!=null){
            ps.close()
          }
        } catch {
          case _:Exception => print("error")
        }
      }
    }

    主程序入口 MySQLSourceSinkApp

    package com.atguigu.flink.app
    
    import com.atguigu.flink.bean.SensorReading
    import com.atguigu.flink.sink.MySQLSink
    import com.atguigu.flink.source.MySQLSource
    import org.apache.flink.streaming.api.datastream.DataStream
    import org.apache.flink.streaming.api.scala
    import org.apache.flink.streaming.api.scala._
    
    
    object MySQLSourceSinkApp {
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
    
        //调用addSource以此来作为数据输入端
        val stream: scala.DataStream[SensorReading] = env.addSource(new MySQLSource)
    
    
        //调用addSink以此来作为数据输出端
        stream.addSink(new MySQLSink())
    
        // 打印流
        stream.print()
    
        // 执行主程序
        env.execute()
      }
    
    }

    2) 通过 JDBCInputFormat方式

        
      val sql_read = "select * from sensor limit 5"

    def readMysql(env:ExecutionEnvironment,url: String, driver: String, user: String, pwd: String, sql: String): DataSet[SensorReading] ={ // 获取数据流 val dataResult: DataSet[Row] = env.createInput(JDBCInputFormat.buildJDBCInputFormat() .setDrivername(driver) .setDBUrl(url) .setUsername(user) .setPassword(pwd) .setQuery(sql) .setRowTypeInfo(new RowTypeInfo( BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO)) .finish()) // 转化为自定义格式 val dStream = dataResult.map(x=> { val id = x.getField(0).asInstanceOf[String] val timestamp = x.getField(1).asInstanceOf[Long] val timepreture = x.getField(2).asInstanceOf[Double] SensorReading(id, timestamp, timepreture) }) return dStream }

    主程序入口MySQLSourceSinkApp2

    package com.atguigu.flink.app
    
    import com.atguigu.flink.bean.SensorReading
    import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
    import org.apache.flink.api.java.typeutils.RowTypeInfo
    import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
    import org.apache.flink.types.Row
    
    object MySQLSourceSinkApp2 {
      def main(args: Array[String]): Unit = {
        val env = ExecutionEnvironment.getExecutionEnvironment
        val driver = "com.mysql.jdbc.Driver"
        val url = "jdbc:mysql://localhost:3306/gmall1122?useSSL=false"
        val username = "root"
        val password = "123456"
        val sql_read = "select * from sensor limit 5"
        val sql_write = "insert into sensor (id, timestamp,timepreture) values(?,?,?)"
    
    
        def readMysql(env:ExecutionEnvironment,url: String, driver: String, user: String, pwd: String, sql: String): DataSet[SensorReading] ={
          // 获取数据流
          val dataResult: DataSet[Row] = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
            .setDrivername(driver)
            .setDBUrl(url)
            .setUsername(user)
            .setPassword(pwd)
            .setQuery(sql)
            .setRowTypeInfo(new RowTypeInfo(
              BasicTypeInfo.STRING_TYPE_INFO,
              BasicTypeInfo.LONG_TYPE_INFO,
              BasicTypeInfo.DOUBLE_TYPE_INFO))
            .finish())
    
          // 转化为自定义格式
          val dStream = dataResult.map(x=> {
            val id = x.getField(0).asInstanceOf[String]
            val timestamp = x.getField(1).asInstanceOf[Long]
            val timepreture = x.getField(2).asInstanceOf[Double]
            SensorReading(id, timestamp, timepreture)
          })
          return dStream
        }
    
        // 读取mysql数据
        val readStream = readMysql(env, url, driver ,username ,password ,sql_read)
    
      }
    
    
    }

    3 flink 写入 MySQL

    1)  通过自定义Sink提交

    MySQLSink

    package com.atguigu.flink.sink
    
    import java.sql.{Connection, DriverManager, PreparedStatement}
    
    import com.atguigu.flink.bean.SensorReading
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
    
    class MySQLSink extends RichSinkFunction[SensorReading]{
      var conn:Connection = null
      var ps:PreparedStatement = null
      val INSERT_CASE:String = "INSERT INTO sensor (id, timestamp,timepreture) " + "VALUES (?, ?, ?) "
    
      override def open(parameters: Configuration): Unit = {
        // 加载驱动
        Class.forName("com.mysql.jdbc.Driver")
        // 数据库连接
        conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/gmall1122?useSSL=false","root","123456")
        ps = conn.prepareStatement(INSERT_CASE)
      }
    
      override def invoke(value:SensorReading): Unit = {
        try{
          ps.setString(1,value.id)
          ps.setLong(2,value.timestamp)
          ps.setDouble(3,value.timepreture)
          ps.addBatch()
          ps.executeBatch()
        } catch {
          case _:Exception => 0
        }
      }
    }

    主程序入口MySQLSourceSinkApp

    package com.atguigu.flink.app
    
    import com.atguigu.flink.bean.SensorReading
    import com.atguigu.flink.sink.MySQLSink
    import com.atguigu.flink.source.MySQLSource
    import org.apache.flink.streaming.api.datastream.DataStream
    import org.apache.flink.streaming.api.scala
    import org.apache.flink.streaming.api.scala._
    
    
    object MySQLSourceSinkApp {
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
    
        //调用addSource以此来作为数据输入端
        val stream: scala.DataStream[SensorReading] = env.addSource(new MySQLSource)
    
    
        //调用addSink以此来作为数据输出端
        stream.addSink(new MySQLSink())
    
        // 打印流
        stream.print()
    
        // 执行主程序
        env.execute()
      }
    
    }

    2)  通过JDBCOutputFormat

    在flink中没有现成的用来写入MySQL的sink,但是flink提供了一个类,JDBCOutputFormat,通过这个类,如果你提供了jdbc的driver,则可以当做sink使用。

    JDBCOutputFormat其实是flink的batch api,但也可以用来作为stream的api使用,社区也推荐通过这种方式来进行。

        val sql_write = "insert into sensor (id, timestamp,timepreture) values(?,?,?)"
    
    
        def writeMysql(env: ExecutionEnvironment, outputData: DataSet[Row], url: String, user: String, pwd: String, sql: String) = {
          outputData.output(JDBCOutputFormat.buildJDBCOutputFormat()
            .setDrivername("com.mysql.jdbc.Driver")
            .setDBUrl(url)
            .setUsername(user)
            .setPassword(pwd)
            .setQuery(sql)
            .finish())
          env.execute("insert data to mysql")
          print("data write successfully")
        }

    主程序入口MySQLSourceSinkApp2

    package com.atguigu.flink.app
    
    import com.atguigu.flink.bean.SensorReading
    import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
    import org.apache.flink.api.java.typeutils.RowTypeInfo
    import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
    import org.apache.flink.types.Row
    
    object MySQLSourceSinkApp2 {
      def main(args: Array[String]): Unit = {
        val env = ExecutionEnvironment.getExecutionEnvironment
        val driver = "com.mysql.jdbc.Driver"
        val url = "jdbc:mysql://localhost:3306/gmall1122?useSSL=false"
        val username = "root"
        val password = "123456"
        val sql_read = "select * from sensor limit 5"
        val sql_write = "insert into sensor (id, timestamp,timepreture) values(?,?,?)"
    
    
        def readMysql(env:ExecutionEnvironment,url: String, driver: String, user: String, pwd: String, sql: String): DataSet[SensorReading] ={
          // 获取数据流
          val dataResult: DataSet[Row] = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
            .setDrivername(driver)
            .setDBUrl(url)
            .setUsername(user)
            .setPassword(pwd)
            .setQuery(sql)
            .setRowTypeInfo(new RowTypeInfo(
              BasicTypeInfo.STRING_TYPE_INFO,
              BasicTypeInfo.LONG_TYPE_INFO,
              BasicTypeInfo.DOUBLE_TYPE_INFO))
            .finish())
    
          // 转化为自定义格式
          val dStream = dataResult.map(x=> {
            val id = x.getField(0).asInstanceOf[String]
            val timestamp = x.getField(1).asInstanceOf[Long]
            val timepreture = x.getField(2).asInstanceOf[Double]
            SensorReading(id, timestamp, timepreture)
          })
          return dStream
        }
    
        // 读取mysql数据
        val readStream = readMysql(env, url, driver ,username ,password ,sql_read)
    
    // 将流中的数据格式转化为JDBCOutputFormat接受的格式 val outputData
    = readStream.map(x => { val row = new Row(3) row.setField(0, x.id) row.setField(1, x.timestamp) row.setField(2, x.timepreture) row }) def writeMysql(env: ExecutionEnvironment, outputData: DataSet[Row], url: String, user: String, pwd: String, sql: String) = { outputData.output(JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername("com.mysql.jdbc.Driver") .setDBUrl(url) .setUsername(user) .setPassword(pwd) .setQuery(sql) .finish()) env.execute("insert data to mysql") print("data write successfully") } // 向mysql插入数据 writeMysql(env,outputData,url,username,password,sql_write) } }

    4 scala读取 MySQL

    MysqlUtil

    package com.atguigu.flink.utils
    
    import java.sql.{Connection, DriverManager, ResultSet, ResultSetMetaData, Statement}
    
    import com.alibaba.fastjson.JSONObject
    
    import scala.collection.mutable.ListBuffer
    
    object MysqlUtil {
      def main(args: Array[String]): Unit = {
        val list:  List[ JSONObject] = queryList("select * from sensor limit 5")
        println(list)
      }
    
      def queryList(sql:String):List[JSONObject]={
        //加载驱动
        Class.forName("com.mysql.jdbc.Driver")
        val resultList: ListBuffer[JSONObject] = new  ListBuffer[ JSONObject]()
        //链接数据库
        val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/gmall1122?useSSL=false","root","123456")
        val stat: Statement = conn.createStatement
        val rs: ResultSet = stat.executeQuery(sql )
        val md: ResultSetMetaData = rs.getMetaData
        while (  rs.next ) {
          val rowData = new JSONObject();
          for (i  <-1 to md.getColumnCount  ) {
            rowData.put(md.getColumnName(i), rs.getObject(i))
          }
          resultList+=rowData
        }
    
        stat.close()
        conn.close()
        resultList.toList
    
        //
      }
    
    }

    5 flink kafka Mysql 实现exatly once

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13680290.html

  • 相关阅读:
    基于注解的mybatis(转)
    git分支删除
    java多线程同步(转)
    hadoop学习笔记(五):java api 操作hdfs
    java常用设计模式一:单例模式
    mysql CONCAT用法
    mysql date_sub用法
    hadoop学习笔记(四):hdfs常用命令
    try-catch+thows异常范围说明
    Python 类的多态
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13680290.html
Copyright © 2011-2022 走看看