zoukankan      html  css  js  c++  java
  • Flink学习(十二) Sink到JDBC(可扩展到任何关系型数据库)

    导入依赖

            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.46</version>
            </dependency>

    编译代码

    package com.wyh.streamingApi.sink
    
    import java.sql.{Connection, DriverManager, PreparedStatement}
    
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
    import org.apache.flink.streaming.api.scala._
    
    object Sink2JDBC {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        //Source操作
        val inputStream = env.readTextFile("F:\flink-study\wyhFlinkSD\data\sensor.txt")
    
        //Transform操作
        val dataStream: DataStream[SensorReading] = inputStream.map(data => {
          val dataArray = data.split(",")
          SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
        })
    
        //Sink操作
        dataStream.addSink(new MyJDBCSink())
    
        env.execute("JDBC sink Test")
    
      }
    
    }
    
    class MyJDBCSink() extends RichSinkFunction[SensorReading]{
      //连接、定义预编译器
      var conn:Connection = _
      var insertStmt:PreparedStatement = _
      var updateStmt:PreparedStatement = _
    
      //初始化 建立
      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","root")
        insertStmt = conn.prepareStatement("insert into temperatures (sensor,temp) values (?,?)")
        updateStmt = conn.prepareStatement("update temperatures set temp = ? where sensor = ?")
      }
    
      //调用连接执行sql
      override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
        //执行更新语句
        updateStmt.setDouble(1,value.temperature)
        updateStmt.setString(2,value.id)
        updateStmt.execute()
    
        //如果update没有查到数据,执行插入语句
        if(updateStmt.getUpdateCount == 0){
          insertStmt.setString(1,value.id)
          insertStmt.setDouble(2,value.temperature)
          insertStmt.execute()
        }
      }
    
      //关闭时 做清理工作
      override def close(): Unit = {
        conn.close()
        insertStmt.close()
        updateStmt.close()
      }
    }

    运行

  • 相关阅读:
    注册、登录、忘记密码实战
    python3错误:format() takes at most 2 arguments
    Charles手机抓包简要步骤
    VARCHAR2(N CHAR)与VARCHAR2(N)的区别
    关于VI一些常用的操作
    LINUX下 基于 Socket 的 UDP 和 TCP 编程具体实现
    VC++6.0实现文本格式的转换保存
    crt的sftp使用用于Windows与Linux之间的通讯
    pl/sql 导出脚本与使用
    在oracle10g下启动服务报 Permission denied错误解决方法
  • 原文地址:https://www.cnblogs.com/wyh-study/p/12924855.html
Copyright © 2011-2022 走看看