zoukankan      html  css  js  c++  java
  • flink下沉数据到mysql

    import java.sql.{Connection, Driver, DriverManager, PreparedStatement}

    import it.bigdata.flink.study.SensorReding
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
    import org.apache.flink.streaming.api.scala._

    object JdbcSinkTest {
    def main(args: Array[String]): Unit = {
    //创建环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //读取数据
    val inputPath="D:\ideaDemo\maven_flink\src\main\resources\sensor.txt"
    val inputStream = env.readTextFile(inputPath)

    //简单转换
    val dataStream = inputStream.map(data => {
    var arr = data.split(",")
    SensorReding(arr(0), arr(1).toLong, arr(1).toDouble)
    })

    dataStream.addSink(new MyJdbcSinkFunc())


    env.execute("jdbc sink test" )
    }
    }

    class MyJdbcSinkFunc() extends RichSinkFunction[SensorReding]{
    //定义连接、预编译语句
    var conn:Connection =_
    var insetstmt:PreparedStatement =_
    var updateStmt:PreparedStatement =_


    override def open(parameters: Configuration): Unit = {
    conn=DriverManager.getConnection("jdbc:mysql://192.168.0.20:3309/test","root","123456")
    insetstmt = conn.prepareStatement("insert into tb_tmp_001(id,x) values(?,?)")
    updateStmt = conn.prepareStatement("update tb_tmp_001 set x=? where id = ?")
    }

    override def invoke(value: SensorReding): Unit = {
    //先执行更新操作,查到就更新
    updateStmt.setString(1,value.temperature.toString)
    updateStmt.setString(2,value.id)
    updateStmt.execute()
    //如果更新没有查到数据,那么就插入
    if(updateStmt.getUpdateCount==0){
    insetstmt.setString(1,value.id)
    insetstmt.setString(2,value.temperature.toString)
    insetstmt.execute()
    }
    }
    override def close(): Unit = {
    insetstmt.close()
    updateStmt.close()
    conn.close()
    }
    }
    author@nohert
  • 相关阅读:
    Flex【原创】移动设备相册图片浏览功能
    Flex SharedObject
    Flex Mobile applicationDPI 自适应
    Flex4.6【原创】移动设备拖曳、缩放、旋转手势并用(避免冲突)
    Flex Copy & Clone
    FlashBuilder 无法调试问题
    Flex【原创】惯性定位效果
    C#播放声音的两个方法 + 流读写文件
    ArrayList
    HDOJ1724椭圆
  • 原文地址:https://www.cnblogs.com/gzgBlog/p/14928283.html
Copyright © 2011-2022 走看看