zoukankan      html  css  js  c++  java
  • Flink输出到JDBC

    1.代码

    import java.sql.{Connection, DriverManager, PreparedStatement}
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

    //温度传感器读取样例类
    case class SensorReading(id: String, timestamp: Long, temperature: Double)

    object JdbcSinkTest {
    def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    //source
    val inputStream = env.readTextFile("sensor1.txt")
    //transform
    import org.apache.flink.api.scala._
    val dataStream = inputStream.map(x => {
    val arr = x.split(",")
    SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
    })
    //sink
    dataStream.addSink(new MyJdbcSink())
    env.execute("jdbc sink test")
    }
    }

    class MyJdbcSink() extends RichSinkFunction[SensorReading] {
    //定义sql连接、预编译器
    var conn: Connection = _
    var insertStmt: PreparedStatement = _
    var updateStmt: PreparedStatement = _

    //初始化 创建连接和预编译语句
    override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "用户名", "密码") //test库
    insertStmt = conn.prepareStatement("insert into temperatures(sensor, temp) values (?, ?)")
    updateStmt = conn.prepareStatement("update temperatures set temp = ? where sensor = ?")
    }

    //调用连接,执行sql
    override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
    //执行更新语句
    updateStmt.setDouble(1, value.temperature) //1和2对应updateStmt里面sql的问号的位置
    updateStmt.setString(2, value.id)
    updateStmt.execute()
    //如果update没有查到数据,就执行插入操作
    if(updateStmt.getUpdateCount == 0) {
    insertStmt.setString(1, value.id)
    insertStmt.setDouble(2, value.temperature)
    insertStmt.execute()
    }
    }

    //关闭
    override def close(): Unit = {
    insertStmt.close()
    updateStmt.close()
    conn.close()
    }

    }

    2.结果

    有帮助的欢迎评论打赏哈,谢谢!

  • 相关阅读:
    Hessian 服务端流程
    JSH面试感悟
    hibernate的update() 更新延迟或者无法更新,导致同个service调用存储过程执行方法不精确
    一个变量名引发的血案
    oracle for loop循环以及游标循环
    My97Datepicker 去掉 “不合法格式或超期范围”自动纠错限制
    获取前后n天的时间
    基于spring aop的操作日志功能
    为TIF、JPG图片添加地理坐标/平面直角坐标
    NGINX 中常规优化
  • 原文地址:https://www.cnblogs.com/wddqy/p/12176191.html
Copyright © 2011-2022 走看看