zoukankan      html  css  js  c++  java
  • flink下沉数据到mysql

    import java.sql.{Connection, Driver, DriverManager, PreparedStatement}

    import it.bigdata.flink.study.SensorReding
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
    import org.apache.flink.streaming.api.scala._

    object JdbcSinkTest {
    def main(args: Array[String]): Unit = {
    //创建环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //读取数据
    val inputPath="D:\ideaDemo\maven_flink\src\main\resources\sensor.txt"
    val inputStream = env.readTextFile(inputPath)

    //简单转换
    val dataStream = inputStream.map(data => {
    var arr = data.split(",")
    SensorReding(arr(0), arr(1).toLong, arr(1).toDouble)
    })

    dataStream.addSink(new MyJdbcSinkFunc())


    env.execute("jdbc sink test" )
    }
    }

    class MyJdbcSinkFunc() extends RichSinkFunction[SensorReding]{
    //定义连接、预编译语句
    var conn:Connection =_
    var insetstmt:PreparedStatement =_
    var updateStmt:PreparedStatement =_


    override def open(parameters: Configuration): Unit = {
    conn=DriverManager.getConnection("jdbc:mysql://192.168.0.20:3309/test","root","123456")
    insetstmt = conn.prepareStatement("insert into tb_tmp_001(id,x) values(?,?)")
    updateStmt = conn.prepareStatement("update tb_tmp_001 set x=? where id = ?")
    }

    override def invoke(value: SensorReding): Unit = {
    //先执行更新操作,查到就更新
    updateStmt.setString(1,value.temperature.toString)
    updateStmt.setString(2,value.id)
    updateStmt.execute()
    //如果更新没有查到数据,那么就插入
    if(updateStmt.getUpdateCount==0){
    insetstmt.setString(1,value.id)
    insetstmt.setString(2,value.temperature.toString)
    insetstmt.execute()
    }
    }
    override def close(): Unit = {
    insetstmt.close()
    updateStmt.close()
    conn.close()
    }
    }
    author@nohert
  • 相关阅读:
    Python requests“Max retries exceeded with url” error
    命令行链接mongo、redis、mysql
    python 删除字典某个key(键)及对应值
    python标准模块(二)
    python标准模块(一)
    格式化输出
    LeetCode----1. Two Sum
    文件操作(初阶)
    python函数基础
    python3内置函数
  • 原文地址:https://www.cnblogs.com/gzgBlog/p/14928283.html
Copyright © 2011-2022 走看看