zoukankan      html  css  js  c++  java
  • Flink学习(十二) Sink到JDBC(可扩展到任何关系型数据库)

    导入依赖

            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.46</version>
            </dependency>

    编译代码

    package com.wyh.streamingApi.sink
    
    import java.sql.{Connection, DriverManager, PreparedStatement}
    
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
    import org.apache.flink.streaming.api.scala._
    
    object Sink2JDBC {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        //Source操作
        val inputStream = env.readTextFile("F:\flink-study\wyhFlinkSD\data\sensor.txt")
    
        //Transform操作
        val dataStream: DataStream[SensorReading] = inputStream.map(data => {
          val dataArray = data.split(",")
          SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
        })
    
        //Sink操作
        dataStream.addSink(new MyJDBCSink())
    
        env.execute("JDBC sink Test")
    
      }
    
    }
    
    class MyJDBCSink() extends RichSinkFunction[SensorReading]{
      //连接、定义预编译器
      var conn:Connection = _
      var insertStmt:PreparedStatement = _
      var updateStmt:PreparedStatement = _
    
      //初始化 建立
      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","root")
        insertStmt = conn.prepareStatement("insert into temperatures (sensor,temp) values (?,?)")
        updateStmt = conn.prepareStatement("update temperatures set temp = ? where sensor = ?")
      }
    
      //调用连接执行sql
      override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
        //执行更新语句
        updateStmt.setDouble(1,value.temperature)
        updateStmt.setString(2,value.id)
        updateStmt.execute()
    
        //如果update没有查到数据,执行插入语句
        if(updateStmt.getUpdateCount == 0){
          insertStmt.setString(1,value.id)
          insertStmt.setDouble(2,value.temperature)
          insertStmt.execute()
        }
      }
    
      //关闭时 做清理工作
      override def close(): Unit = {
        conn.close()
        insertStmt.close()
        updateStmt.close()
      }
    }

    运行

  • 相关阅读:
    MYSQL profiling分析语句
    进程状态与僵尸进程、孤儿进程
    Nginx跨域设置
    Redis的应用场景
    Nginx的作用
    cgi、fast-cgi和php-fpm的介绍(作用)
    CoreDump开启和Swoole Tracker 3.0配置
    好题总结
    Atcoder总结 Part III
    Atcoder总结 Part II
  • 原文地址:https://www.cnblogs.com/wyh-study/p/12924855.html
Copyright © 2011-2022 走看看